Skip to content

Commit

Permalink
Moved panic messages to constant mod.
Browse files Browse the repository at this point in the history
  • Loading branch information
yotarok committed Oct 4, 2023
1 parent 5a3be6d commit 3030142
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 24 deletions.
17 changes: 17 additions & 0 deletions src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,20 @@ pub const QLPC_DEFAULT_ORDER: usize = 10;

/// Default precision for storing QLPC coefficients.
pub const QLPC_DEFAULT_PRECISION: usize = 12;

/// Module for internal error messages.
///
/// Use `panic!` and those messages only for env-related unrecoveralbe errors.
/// It's okay to use them in tests, but it's not okay to add another variable
/// only for test functions.
pub mod panic_msg {
pub const MPMC_SEND_FAILED: &str =
"INTERNAL ERROR: Critical error occured in multi-thread communication channel.";
pub const MPMC_RECV_FAILED: &str =
"INTERNAL ERROR: Critical error occured in multi-thread communication channel.";
pub const MUTEX_LOCK_FAILED: &str = "INTERNAL ERROR: Couldn't get lock for mutex.";
pub const MUTEX_DROP_FAILED: &str = "INTERNAL ERROR: Couldn't discard mutex.";
pub const FRAMENUM_NOT_SET: &str =
"INTERNAL ERROR: Frame buffer is not properly initialized. (FrameNo. not set).";
pub const THREAD_JOIN_FAILED: &str = "INTERNAL ERROR: Failed to wait thread termination.";
}
61 changes: 37 additions & 24 deletions src/par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::component::Frame;
use super::component::Stream;
use super::config;
use super::constant;
use super::constant::panic_msg;
use super::error::SourceError;
use super::source::Context;
use super::source::FrameBuf;
Expand All @@ -34,13 +35,6 @@ use crossbeam_channel;
use crossbeam_channel::Receiver;
use crossbeam_channel::Sender;

const MPMC_CHANNEL_PANIC_MSG: &str =
"INTERNAL ERROR: Critical error occured in multi-thread communication channel.";
const MUTEX_PANIC_MSG: &str = "INTERNAL ERROR: Couldn't get lock for mutex.";
const UNINIT_FRAMENUM_PANIC_MSG: &str =
"INTERNAL ERROR: Frame buffer is not properly initialized. (FrameNo. not set).";
const THREAD_TERMINATION_PANIC_MSG: &str = "INTERNAL ERROR: Failed to wait thread termination.";

/// Sink object that stores encoding results.
///
/// This is currently just a `BTreeMap<usize, T>` with some utility functions.
Expand All @@ -58,7 +52,7 @@ impl<T> ParSink<T> {

/// Stores a computation result `element` with a serial id `idx`.
pub fn push(&self, idx: usize, element: T) {
let mut data = self.data.lock().expect(MUTEX_PANIC_MSG);
let mut data = self.data.lock().expect(panic_msg::MUTEX_LOCK_FAILED);
data.insert(idx, element);
}

Expand All @@ -67,7 +61,7 @@ impl<T> ParSink<T> {
where
F: FnMut(T),
{
let data = self.data.into_inner().expect(MUTEX_PANIC_MSG);
let data = self.data.into_inner().expect(panic_msg::MUTEX_DROP_FAILED);
data.into_values().for_each(f);
}
}
Expand Down Expand Up @@ -99,7 +93,7 @@ impl ParFrameBuf {
let (refill_sender, refill_receiver) = crossbeam_channel::bounded(replicas + 1);

(0..replicas).for_each(|t| {
refill_sender.send(t).expect(MPMC_CHANNEL_PANIC_MSG);
refill_sender.send(t).expect(panic_msg::MPMC_SEND_FAILED);
});
Self {
buffers,
Expand All @@ -124,10 +118,16 @@ impl ParFrameBuf {
let mut src = src;

'feed: loop {
let bufid = self.refill_queue.1.recv().expect(MPMC_CHANNEL_PANIC_MSG);
let bufid = self
.refill_queue
.1
.recv()
.expect(panic_msg::MPMC_RECV_FAILED);

{
let mut numbuf = self.buffers[bufid].lock().expect(MPMC_CHANNEL_PANIC_MSG);
let mut numbuf = self.buffers[bufid]
.lock()
.expect(panic_msg::MUTEX_LOCK_FAILED);
let read_samples = src.read_samples(&mut numbuf.framebuf, context)?;
if read_samples == 0 {
break 'feed;
Expand All @@ -137,13 +137,13 @@ impl ParFrameBuf {
self.encode_queue
.0
.send(Some(bufid))
.expect(MPMC_CHANNEL_PANIC_MSG);
.expect(panic_msg::MPMC_SEND_FAILED);
}
for _i in 0..workers {
self.encode_queue
.0
.send(None)
.expect(MPMC_CHANNEL_PANIC_MSG);
.expect(panic_msg::MPMC_SEND_FAILED);
}
Ok(())
}
Expand All @@ -153,13 +153,18 @@ impl ParFrameBuf {
/// If this returns None, workder thread must immediately stop.
#[inline]
pub fn pop_encode_queue(&self) -> Option<usize> {
self.encode_queue.1.recv().expect(MPMC_CHANNEL_PANIC_MSG)
self.encode_queue
.1
.recv()
.expect(panic_msg::MPMC_RECV_FAILED)
}

/// Locks `FrameBuf` with the specified id and returns `MutexGuard`.
#[inline]
pub fn lock_buffer(&self, bufid: usize) -> std::sync::MutexGuard<'_, NumberedFrameBuf> {
self.buffers[bufid].lock().expect(MUTEX_PANIC_MSG)
self.buffers[bufid]
.lock()
.expect(panic_msg::MUTEX_LOCK_FAILED)
}

/// Requests refill for `FrameBuf` with the specified id.
Expand All @@ -168,7 +173,7 @@ impl ParFrameBuf {
self.refill_queue
.0
.send(bufid)
.expect(MPMC_CHANNEL_PANIC_MSG);
.expect(panic_msg::MPMC_SEND_FAILED);
}
}

Expand Down Expand Up @@ -220,7 +225,7 @@ pub fn encode_with_fixed_block_size<T: Source>(
while let Some(bufid) = parbuf.pop_encode_queue() {
let (frame_number, mut frame) = {
let numbuf = &parbuf.lock_buffer(bufid);
let frame_number = numbuf.frame_number.expect(UNINIT_FRAMENUM_PANIC_MSG);
let frame_number = numbuf.frame_number.expect(panic_msg::FRAMENUM_NOT_SET);
(
frame_number,
coding::encode_fixed_size_frame(
Expand All @@ -244,7 +249,7 @@ pub fn encode_with_fixed_block_size<T: Source>(
parbuf.feed(src, &mut context, worker_count)?;

for h in join_handles {
h.join().expect(THREAD_TERMINATION_PANIC_MSG);
h.join().expect(panic_msg::THREAD_JOIN_FAILED);
}

Arc::into_inner(parsink)
Expand Down Expand Up @@ -310,21 +315,29 @@ mod tests {
}
let src = source::PreloadedSignal::from_samples(&signal, channels, 16, 16000);
let mut ctx = Context::new(16, channels);
pfb.feed(src, &mut ctx, workers).expect("Expect no error.");
pfb.feed(src, &mut ctx, workers).expect("Feeding failed");
});
}
thread::sleep(std::time::Duration::from_secs_f32(0.1));
assert_eq!(pfb.encode_queue.1.len(), 3);
for _t in 0..6 {
let received = pfb.encode_queue.1.recv().expect("Expect no error.");
let received = pfb
.encode_queue
.1
.recv()
.expect(panic_msg::MPMC_RECV_FAILED);
assert!(received.is_some());
let bufid = received.unwrap();
pfb.enqueue_refill(bufid);
}
thread::sleep(std::time::Duration::from_secs_f32(0.1));
assert_eq!(pfb.encode_queue.1.len(), workers); // stop signal.
for _t in 0..workers {
let received = pfb.encode_queue.1.recv().expect("Expect no error.");
let received = pfb
.encode_queue
.1
.recv()
.expect(panic_msg::MPMC_RECV_FAILED);
assert!(received.is_none());
}
}
Expand All @@ -338,11 +351,11 @@ mod tests {
thread::spawn(move || sink.push(t, REFERENCE[t]))
});
for h in handles {
h.join().expect("join failed.");
h.join().expect(panic_msg::THREAD_JOIN_FAILED);
}
let mut result = vec![];
Arc::into_inner(sink)
.expect("Expect no error.")
.expect("Arc deconstruction failed")
.finalize(|v| result.push(v));
assert_eq!(result, REFERENCE);
}
Expand Down

0 comments on commit 3030142

Please sign in to comment.