Skip to content

Commit

Permalink
Merge pull request #48 from hnez/error-handling
Browse files Browse the repository at this point in the history
watched_tasks: improve error handling in spawned tasks and threads
  • Loading branch information
hnez authored Jun 25, 2024
2 parents c06a7b4 + 79f241b commit c75d701
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 80 deletions.
57 changes: 15 additions & 42 deletions src/adc/iio/hardware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::path::Path;
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering};
use std::time::{Duration, Instant};

use anyhow::{anyhow, Context, Error, Result};
use anyhow::{anyhow, Context, Result};
use async_std::channel::bounded;
use async_std::sync::Arc;

Expand Down Expand Up @@ -275,40 +275,27 @@ impl IioThread {
// setup was sucessful.
// This is why we create Self inside the thread and send it back
// to the calling thread via a queue.
let (thread_res_tx, thread_res_rx) = bounded(1);
let (thread_tx, thread_rx) = bounded(1);

// Spawn a high priority thread that updates the atomic values in `thread`.
wtb.spawn_thread(thread_name, move || {
let adc_setup_res = Self::adc_setup(
let (channels, mut buf) = Self::adc_setup(
adc_name,
trigger_name,
sample_rate,
channel_descs,
buffer_len,
);
let (thread, channels, mut buf) = match adc_setup_res {
Ok((channels, buf)) => {
let thread = Arc::new(Self {
ref_instant: Instant::now(),
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
channel_descs,
});

(thread, channels, buf)
}
Err(e) => {
// Can not fail in practice as the queue is known to be empty
// at this point.
thread_res_tx
.try_send(Err(e))
.expect("Failed to signal ADC setup error due to full queue");
return Ok(());
}
};
)?;

let thread = Arc::new(Self {
ref_instant: Instant::now(),
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
channel_descs,
});

let thread_weak = Arc::downgrade(&thread);
let mut signal_ready = Some((thread, thread_res_tx));
let mut signal_ready = Some((thread, thread_tx));

// Stop running as soon as the last reference to this Arc<IioThread>
// is dropped (e.g. the weak reference can no longer be upgraded).
Expand All @@ -318,18 +305,7 @@ impl IioThread {

error!("Failed to refill {} ADC buffer: {}", adc_name, e);

// If the ADC has not yet produced any values we still have the
// queue at hand that signals readiness to the main thread.
// This gives us a chance to return an Err from new().
// If the queue was already used just print an error instead.
if let Some((_, tx)) = signal_ready.take() {
// Can not fail in practice as the queue is only .take()n
// once and thus known to be empty.
tx.try_send(Err(Error::new(e)))
.expect("Failed to signal ADC setup error due to full queue");
}

break;
Err(e)?;
}

let values = channels.iter().map(|ch| {
Expand All @@ -356,17 +332,14 @@ impl IioThread {
if let Some((content, tx)) = signal_ready.take() {
// Can not fail in practice as the queue is only .take()n
// once and thus known to be empty.
tx.try_send(Ok(content))
.expect("Failed to signal ADC setup completion due to full queue");
tx.try_send(content)?;
}
}

Ok(())
})?;

let thread = thread_res_rx.recv().await??;

Ok(thread)
Ok(thread_rx.recv().await?)
}

pub async fn new_stm32(
Expand Down
2 changes: 1 addition & 1 deletion src/digital_io/gpio/demo_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct LineHandle {
}

impl LineHandle {
pub fn set_value(&self, val: u8) -> Result<(), ()> {
pub fn set_value(&self, val: u8) -> Result<()> {
// This does not actually set up any IIO things.
// It is just a hack to let adc/iio/demo_mode.rs
// communicate with this function so that toggling an output
Expand Down
2 changes: 1 addition & 1 deletion src/digital_io/gpio/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct LineHandle {
}

impl LineHandle {
pub fn set_value(&self, val: u8) -> Result<(), ()> {
pub fn set_value(&self, val: u8) -> Result<()> {
println!("GPIO simulation set {} to {}", self.name, val);
self.val.store(val, Ordering::Relaxed);
Ok(())
Expand Down
69 changes: 33 additions & 36 deletions src/dut_power.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ mod prio {
use thread_priority::*;

pub fn realtime_priority() -> Result<()> {
let prio = ThreadPriorityValue::try_from(10)
.map_err(|e| anyhow!("Failed to choose realtime priority level 10: {e:?}"))?;

set_thread_priority_and_policy(
thread_native_id(),
ThreadPriority::Crossplatform(ThreadPriorityValue::try_from(10).unwrap()),
ThreadPriority::Crossplatform(prio),
ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo),
)
.map_err(|e| anyhow!("Failed to set up realtime priority {e:?}"))
Expand Down Expand Up @@ -260,10 +263,12 @@ fn turn_off_with_reason(
pwr_line: &LineHandle,
discharge_line: &LineHandle,
fail_state: &AtomicU8,
) {
pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap();
discharge_line.set_value(DISCHARGE_LINE_ASSERTED).unwrap();
) -> Result<()> {
pwr_line.set_value(1 - PWR_LINE_ASSERTED)?;
discharge_line.set_value(DISCHARGE_LINE_ASSERTED)?;
fail_state.store(reason as u8, Ordering::Relaxed);

Ok(())
}

/// Labgrid has a fixed assumption of how a REST based power port should work.
Expand Down Expand Up @@ -333,7 +338,7 @@ impl DutPwrThread {
// as well.
// Use a queue to notify the calling thread if the priority setup
// succeeded.
let (thread_res_tx, mut thread_res_rx) = bounded(1);
let (thread_tx, thread_rx) = bounded(1);

// Spawn a high priority thread that handles the power status
// in a realtimey fashion.
Expand All @@ -348,24 +353,20 @@ impl DutPwrThread {
let mut volt_filter = MedianFilter::<4>::new();
let mut curr_filter = MedianFilter::<4>::new();

let (tick_weak, request, state) = match realtime_priority() {
Ok(_) => {
let tick = Arc::new(AtomicU32::new(0));
let tick_weak = Arc::downgrade(&tick);
realtime_priority()?;

let request = Arc::new(AtomicU8::new(OutputRequest::Idle as u8));
let state = Arc::new(AtomicU8::new(OutputState::Off as u8));
let (tick_weak, request, state) = {
let tick = Arc::new(AtomicU32::new(0));
let tick_weak = Arc::downgrade(&tick);

thread_res_tx
.try_send(Ok((tick, request.clone(), state.clone())))
.unwrap();
let request = Arc::new(AtomicU8::new(OutputRequest::Idle as u8));
let state = Arc::new(AtomicU8::new(OutputState::Off as u8));

(tick_weak, request, state)
}
Err(e) => {
thread_res_tx.try_send(Err(e)).unwrap();
panic!()
}
thread_tx
.try_send((tick, request.clone(), state.clone()))
.expect("Queue that should be empty wasn't");

(tick_weak, request, state)
};

// The grace period contains the number of loop iterations until
Expand Down Expand Up @@ -406,7 +407,7 @@ impl DutPwrThread {
&pwr_line,
&discharge_line,
&state,
);
)?;
} else {
// We have a fresh ADC value. Signal "everything is well"
// to the watchdog task.
Expand Down Expand Up @@ -463,7 +464,7 @@ impl DutPwrThread {
&pwr_line,
&discharge_line,
&state,
);
)?;

continue;
}
Expand All @@ -474,7 +475,7 @@ impl DutPwrThread {
&pwr_line,
&discharge_line,
&state,
);
)?;

continue;
}
Expand All @@ -485,7 +486,7 @@ impl DutPwrThread {
&pwr_line,
&discharge_line,
&state,
);
)?;

continue;
}
Expand All @@ -496,34 +497,30 @@ impl DutPwrThread {
match req {
OutputRequest::Idle => {}
OutputRequest::On => {
discharge_line
.set_value(1 - DISCHARGE_LINE_ASSERTED)
.unwrap();
pwr_line.set_value(PWR_LINE_ASSERTED).unwrap();
discharge_line.set_value(1 - DISCHARGE_LINE_ASSERTED)?;
pwr_line.set_value(PWR_LINE_ASSERTED)?;
state.store(OutputState::On as u8, Ordering::Relaxed);
}
OutputRequest::Off => {
discharge_line.set_value(DISCHARGE_LINE_ASSERTED).unwrap();
pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap();
discharge_line.set_value(DISCHARGE_LINE_ASSERTED)?;
pwr_line.set_value(1 - PWR_LINE_ASSERTED)?;
state.store(OutputState::Off as u8, Ordering::Relaxed);
}
OutputRequest::OffFloating => {
discharge_line
.set_value(1 - DISCHARGE_LINE_ASSERTED)
.unwrap();
pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap();
discharge_line.set_value(1 - DISCHARGE_LINE_ASSERTED)?;
pwr_line.set_value(1 - PWR_LINE_ASSERTED)?;
state.store(OutputState::OffFloating as u8, Ordering::Relaxed);
}
}
}

// Make sure to enter fail safe mode before leaving the thread
turn_off_with_reason(OutputState::Off, &pwr_line, &discharge_line, &state);
turn_off_with_reason(OutputState::Off, &pwr_line, &discharge_line, &state)?;

Ok(())
})?;

let (tick, request, state) = thread_res_rx.next().await.unwrap()?;
let (tick, request, state) = thread_rx.recv().await?;

// The request and state topic use the same external path, this way one
// can e.g. publish "On" to the topic and be sure that the output is
Expand Down

0 comments on commit c75d701

Please sign in to comment.