Skip to content

Commit

Permalink
[AsyncCAN] Handle RecvError::Lagged and RecvError::Closed (#57)
Browse files Browse the repository at this point in the history
* [AsyncCAN] Handle RecvError::Lagged

* handle socketcan network down
  • Loading branch information
pd0wm authored Apr 1, 2024
1 parent 65e8465 commit 0969a5f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
12 changes: 7 additions & 5 deletions src/can/async_can.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::can::Frame;
use crate::can::Identifier;
use crate::Stream;
use async_stream::stream;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::debug;

Expand All @@ -27,7 +28,7 @@ fn process<T: CanAdapter>(
let mut callbacks: HashMap<BusIdentifier, VecDeque<FrameCallback>> = HashMap::new();

while shutdown_receiver.try_recv().is_err() {
let frames: Vec<Frame> = adapter.recv().unwrap();
let frames: Vec<Frame> = adapter.recv().expect("Failed to Receive CAN Frames");

for frame in frames {
if DEBUG {
Expand Down Expand Up @@ -142,11 +143,12 @@ impl AsyncCanAdapter {
Ok(frame) => {
if filter(&frame) {
yield frame
} else {
continue
}
}
Err(_) => continue,
},
Err(RecvError::Closed) => panic!("Adapter thread has exited"),
Err(RecvError::Lagged(n)) => {
tracing::warn!("Receive too slow, dropping {} frame(s).", n)
},
}
}
})
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum Error {
MalformedFrame,
#[error("Timeout")]
Timeout,
#[error("Disconnected")]
Disconnected,
#[error(transparent)]
IsoTPError(#[from] crate::isotp::Error),
#[error(transparent)]
Expand Down
19 changes: 15 additions & 4 deletions src/socketcan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,21 @@ impl CanAdapter for SocketCan {
fn recv(&mut self) -> Result<Vec<Frame>> {
let mut frames = vec![];

while let Ok((frame, meta)) = self.socket.read_frame_with_meta() {
let mut frame: crate::can::Frame = frame.into();
frame.loopback = meta.loopback;
frames.push(frame);
loop {
match self.socket.read_frame_with_meta() {
Ok((frame, meta)) => {
let mut frame: crate::can::Frame = frame.into();
frame.loopback = meta.loopback;
frames.push(frame);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
break;
}
Err(e) => {
tracing::error!("Error reading frame: {}", e);
return Err(crate::error::Error::Disconnected);
}
}
}

// Add fake loopback frames to the receive queue
Expand Down
8 changes: 5 additions & 3 deletions tests/adapter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::time::Duration;

static BULK_NUM_FRAMES_SYNC: u64 = 0x100;
static BULK_NUM_FRAMES_ASYNC: u64 = 0x1000;
static BULK_TIMEOUT_MS: u64 = 1000;
static BULK_SYNC_TIMEOUT_MS: u64 = 1000;
static BULK_ASYNC_TIMEOUT_MS: u64 = 5000;

/// Sends a large number of frames to a "blocking" adapter, and then reads back all sent messages.
/// This verified the adapter doesn't drop messages and reads them back in the same order as they are sent,
Expand All @@ -27,7 +28,8 @@ fn bulk_send_sync<T: CanAdapter>(adapter: &mut T) {
let start = std::time::Instant::now();

let mut received: Vec<Frame> = vec![];
while received.len() < frames.len() && start.elapsed() < Duration::from_millis(BULK_TIMEOUT_MS)
while received.len() < frames.len()
&& start.elapsed() < Duration::from_millis(BULK_SYNC_TIMEOUT_MS)
{
let rx = adapter.recv().unwrap();
let rx: Vec<Frame> = rx.into_iter().filter(|frame| frame.loopback).collect();
Expand Down Expand Up @@ -55,7 +57,7 @@ async fn bulk_send(adapter: &AsyncCanAdapter) {

let r = frames.iter().map(|frame| adapter.send(frame));
tokio::time::timeout(
Duration::from_millis(BULK_TIMEOUT_MS),
Duration::from_millis(BULK_ASYNC_TIMEOUT_MS),
futures::future::join_all(r),
)
.await
Expand Down

0 comments on commit 0969a5f

Please sign in to comment.