-
Notifications
You must be signed in to change notification settings - Fork 0
/
txrx.rs
171 lines (160 loc) · 5.1 KB
/
txrx.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
use super::{traits::PhyPacket, FrameDetector, FramePayload, Modem, PreambleGen};
use crate::{
traits::{InStream, OutStream, PacketReceiver, PacketSender, Sample, FP},
DefaultConfig,
};
use crossbeam::channel::{unbounded as unbounded_channel, Receiver, Sender};
use std::{
marker::PhantomData,
thread::{self, JoinHandle},
time::{Duration, Instant},
};
/// A send only PHY layer object.
/// - PG: preamble generator
/// - MM: modulator/demodulator
/// - SS: sample input stream
/// - E: sample input stream error type
pub struct PhySender<PG, MM, SS, E> {
_pg: PhantomData<PG>,
_err: PhantomData<E>,
preamble_samples: Vec<FP>,
modem: MM,
stream_out: SS,
}
impl<PG, MM, SS, E> PhySender<PG, MM, SS, E>
where
PG: PreambleGen,
MM: Modem,
SS: OutStream<FP, E>,
{
pub fn new(stream_out: SS, modem: MM) -> Self {
let preamble_samples = PG::generate().samples();
Self {
_pg: PhantomData::default(),
_err: PhantomData::default(),
preamble_samples,
modem,
stream_out,
}
}
pub const SAMPLES_PER_PACKET: usize = PG::PREAMBLE_LEN + MM::SAMPLES_PER_PACKET;
}
impl<PG, MM, SS, E> PacketSender<PhyPacket, E> for PhySender<PG, MM, SS, E>
where
PG: PreambleGen,
MM: Modem,
SS: OutStream<FP, E>,
{
/// frame = warm up + preamble + payload
/// - warm up: random samples whose absolute value is cloes to 1.0
/// - preamble: predefined samples
/// - payload: output of modulation on packet bytes
/// NOTE: write them to the underlying stream together with `write_once`
fn send(&mut self, packet: PhyPacket) -> Result<(), E> {
assert_eq!(packet.len(), MM::BYTES_PER_PACKET);
let mut buf = Vec::with_capacity(PG::PREAMBLE_LEN + MM::SAMPLES_PER_PACKET);
buf.extend(&self.preamble_samples);
buf.extend(self.modem.modulate(&packet));
let n = self.stream_out.write_exact(&buf)?;
self.stream_out.wait();
Ok(n)
}
}
/// A receive only PHY layer object.
/// - PG: preamble generator
/// - MM: modulation encoder/decoder
/// - FD: frame detector
/// - SS: sample output stream
/// - E: sample output stream error type
pub struct PhyReceiver<PG, MM, FD, SS, E> {
_pg: PhantomData<PG>,
_fd: PhantomData<FD>,
_ss: PhantomData<SS>,
_err: PhantomData<E>,
modem: MM,
frame_payload_rx: Receiver<FramePayload>,
exit_tx: Sender<()>,
handler: Option<JoinHandle<()>>,
}
impl<PG, MM, FD, SS, E> PhyReceiver<PG, MM, FD, SS, E>
where
PG: PreambleGen,
MM: Modem,
FD: FrameDetector + Send + 'static,
SS: InStream<FP, E> + Send + 'static,
E: std::fmt::Debug,
{
/// A separated worker thread repeatedly do the procedure
/// 0. exit if notified by exit channel
/// 1. fetch samples from underlying stream
/// 2. push them to frame detector
/// 3. if a frame is detected, send it to the PhyReceiver through a channel
fn worker(mut stream_in: SS, mut frame_detector: FD, frame_playload_rx: Sender<FramePayload>, exit_rx: Receiver<()>) {
// TODO: select a proper interval
let fetch_interval =
Duration::from_secs_f32(2.0 * DefaultConfig::BUFFER_SIZE as f32 / DefaultConfig::SAMPLE_RATE as f32);
let last_fetch = Instant::now() - fetch_interval;
// TODO: select a proper buffer size
let mut buf = [Sample::ZERO; DefaultConfig::BUFFER_SIZE * 8];
while exit_rx.try_recv().is_err() {
if last_fetch.elapsed() > fetch_interval {
let n = stream_in.read(&mut buf).unwrap();
buf[..n].iter().for_each(|x| {
if let Some(payload) = frame_detector.on_sample(*x) {
frame_playload_rx.send(payload).unwrap();
}
});
}
thread::yield_now();
}
}
pub fn new(stream_in: SS, modem: MM, frame_detector: FD) -> Self {
let (exit_tx, exit_rx) = unbounded_channel();
let (frame_playload_tx, frame_payload_rx) = unbounded_channel();
let handler = thread::spawn(move || Self::worker(stream_in, frame_detector, frame_playload_tx, exit_rx));
Self {
_pg: PhantomData::default(),
_fd: PhantomData::default(),
_ss: PhantomData::default(),
_err: PhantomData::default(),
modem,
frame_payload_rx,
exit_tx,
handler: Some(handler),
}
}
}
impl<PG, MM, FD, SS, E> PacketReceiver<PhyPacket, ()> for PhyReceiver<PG, MM, FD, SS, E>
where
PG: PreambleGen,
MM: Modem,
FD: FrameDetector,
SS: InStream<FP, E>,
{
// receive frame from the channel and then demodulate the signal
fn recv(&mut self) -> Result<PhyPacket, ()> {
match self.frame_payload_rx.try_recv() {
Ok(payload) => Ok(self.modem.demodulate(&payload)),
Err(_) => Err(()),
}
}
fn recv_timeout(&mut self, timeout: Duration) -> Result<PhyPacket, ()> {
match self.frame_payload_rx.recv_timeout(timeout) {
Ok(payload) => Ok(self.modem.demodulate(&payload)),
Err(_) => Err(()),
}
}
fn recv_peek(&mut self) -> bool {
!self.frame_payload_rx.is_empty()
}
}
impl<PG, MM, FD, SS, E> Drop for PhyReceiver<PG, MM, FD, SS, E> {
// notify the worker thread to exit
// wait for the worker thread to stop
fn drop(&mut self) {
self.exit_tx.send(()).unwrap();
if let Some(worker) = self.handler.take() {
worker.join().unwrap();
}
}
}