Skip to content

Commit

Permalink
mcu-util: refactor blocking call to can-rs in async
Browse files Browse the repository at this point in the history
and some tiny refacto
  • Loading branch information
fouge committed May 2, 2024
1 parent 613254f commit fc65538
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 58 deletions.
56 changes: 25 additions & 31 deletions mcu-util/src/messaging/can/canfd.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::mpsc;

use async_trait::async_trait;
use can_rs::filter::Filter;
use can_rs::stream::FrameStream;
use can_rs::{Frame, Id, CANFD_DATA_LEN};
use eyre::{eyre, Context, Result};
use orb_messages::CommonAckError;
use prost::Message;
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{mpsc, Arc};
use tokio::time::Duration;
use tracing::debug;

use can_rs::filter::Filter;
use can_rs::stream::FrameStream;
use can_rs::{Frame, Id};

use crate::messaging::Device::{JetsonFromMain, JetsonFromSecurity, Main, Security};
use crate::messaging::{
handle_main_mcu_message, handle_sec_mcu_message, Device, McuPayload,
MessagingInterface,
};

pub struct CanRawMessaging {
stream: FrameStream<64>,
stream: FrameStream<CANFD_DATA_LEN>,
ack_num_lsb: AtomicU16,
ack_queue: mpsc::Receiver<(CommonAckError, u32)>,
can_node: Device,
Expand All @@ -35,7 +33,7 @@ impl CanRawMessaging {
new_message_queue: mpsc::Sender<McuPayload>,
) -> Result<Self> {
// open socket
let stream = FrameStream::<64>::build()
let stream = FrameStream::<CANFD_DATA_LEN>::build()
.nonblocking(false)
.filters(vec![
Filter {
Expand All @@ -47,13 +45,14 @@ impl CanRawMessaging {
mask: 0xff,
},
])
.bind(bus.as_str().parse().unwrap())?;
.bind(bus.as_str().parse().unwrap())
.wrap_err("Failed to bind CAN stream")?;

let (ack_tx, ack_rx) = mpsc::channel();

// spawn CAN receiver
let stream_copy = stream.try_clone()?;
tokio::task::spawn(can_rx(stream_copy, can_node, ack_tx, new_message_queue));
tokio::task::spawn_blocking(move || {
can_rx(stream_copy, can_node, ack_tx, new_message_queue)
});

Ok(Self {
stream,
Expand All @@ -78,9 +77,12 @@ impl CanRawMessaging {
}
}

#[allow(dead_code)]
async fn send_wait_ack(&mut self, frame: &Frame<64>) -> Result<CommonAckError> {
self.stream.send(frame, 0)?;
async fn send_wait_ack(
&mut self,
frame: Arc<Frame<CANFD_DATA_LEN>>,
) -> Result<CommonAckError> {
let stream = self.stream.try_clone()?;
tokio::task::spawn_blocking(move || stream.send(&frame, 0)).await??;

// put some randomness into ack number to prevent collision with other processes
let expected_ack_number =
Expand All @@ -94,15 +96,14 @@ impl CanRawMessaging {
/// Receive CAN frames
/// - relay acks to `ack_tx`
/// - relay new McuMessage to `new_message_queue`
#[allow(dead_code)]
async fn can_rx(
stream: FrameStream<64>,
fn can_rx(
stream: FrameStream<CANFD_DATA_LEN>,
remote_node: Device,
ack_tx: mpsc::Sender<(CommonAckError, u32)>,
new_message_queue: mpsc::Sender<McuPayload>,
) -> Result<()> {
loop {
let mut frame: Frame<64> = Frame::empty();
let mut frame: Frame<CANFD_DATA_LEN> = Frame::empty();
loop {
match stream.recv(&mut frame, 0) {
Ok(_) => {
Expand Down Expand Up @@ -146,7 +147,6 @@ async fn can_rx(
#[async_trait]
impl MessagingInterface for CanRawMessaging {
/// Send payload into McuMessage
#[allow(dead_code)]
async fn send(&mut self, payload: McuPayload) -> Result<CommonAckError> {
// snowflake ack ID to avoid collisions:
// prefix ack number with process ID
Expand Down Expand Up @@ -201,26 +201,20 @@ impl MessagingInterface for CanRawMessaging {
};

if let Some(bytes) = bytes {
let mut buf: [u8; 64] = [0u8; 64];
let mut buf: [u8; CANFD_DATA_LEN] = [0u8; CANFD_DATA_LEN];
buf[..bytes.len()].copy_from_slice(bytes.as_slice());

let node_addr = self.can_node as u32;
let frame = Frame {
id: Id::Extended(node_addr),
len: 64,
len: CANFD_DATA_LEN as u8,
flags: 0x0F,
data: buf,
};

self.send_wait_ack(&frame).await
self.send_wait_ack(Arc::new(frame)).await
} else {
Err(eyre!("Failed to encode payload"))
}
}
}

impl Drop for CanRawMessaging {
fn drop(&mut self) {
// TODO
}
}
58 changes: 31 additions & 27 deletions mcu-util/src/messaging/can/isotp.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::io::{Read, Write};
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::mpsc;

use async_trait::async_trait;
use eyre::{eyre, Context, Result};
use orb_messages::CommonAckError;
use prost::Message;
use std::io::{Read, Write};
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{mpsc, Arc};
use tokio::time::Duration;
use tracing::{debug, error};

use can_rs::isotp::addr::CanIsotpAddr;
use can_rs::isotp::stream::IsotpStream;
use can_rs::Id;
use can_rs::{Id, CAN_DATA_LEN};

use crate::messaging::{
handle_main_mcu_message, handle_sec_mcu_message, McuPayload, MessagingInterface,
Expand Down Expand Up @@ -69,7 +68,7 @@ impl From<u8> for IsoTpNodeIdentifier {
}

pub struct CanIsoTpMessaging {
tx_stream: IsotpStream<8>,
stream: IsotpStream<CAN_DATA_LEN>,
ack_num_lsb: AtomicU16,
ack_queue: mpsc::Receiver<(CommonAckError, u32)>,
}
Expand Down Expand Up @@ -103,23 +102,27 @@ impl CanIsoTpMessaging {
let (tx_stdid_src, tx_stdid_dst) = create_pair(local, remote)?;
debug!("Sending on 0x{:x}->0x{:x}", tx_stdid_src, tx_stdid_dst);

let (ack_tx, ack_rx) = mpsc::channel();

// open TX stream
let tx_isotp_stream = IsotpStream::<8>::build().bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(tx_stdid_dst),
Id::Standard(tx_stdid_src),
let tx_isotp_stream = IsotpStream::<CAN_DATA_LEN>::build()
.bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(tx_stdid_dst),
Id::Standard(tx_stdid_src),
)
.expect("Unable to build IsoTpStream"),
)
.expect("Unable to build IsoTpStream"),
)?;
.wrap_err("Failed to bind CAN ISO-TP stream")?;

let (ack_tx, ack_rx) = mpsc::channel();

// spawn CAN receiver
tokio::task::spawn(can_rx(bus, remote, local, ack_tx, new_message_queue));
tokio::task::spawn_blocking(move || {
can_rx(bus, remote, local, ack_tx, new_message_queue)
});

Ok(CanIsoTpMessaging {
tx_stream: tx_isotp_stream,
stream: tx_isotp_stream,
ack_num_lsb: AtomicU16::new(0),
ack_queue: ack_rx,
})
Expand All @@ -140,13 +143,14 @@ impl CanIsoTpMessaging {
}
}

async fn send_wait_ack(&mut self, frame: &[u8]) -> Result<CommonAckError> {
match self.tx_stream.write(frame) {
Ok(_) => {}
Err(err) => {
error!("Error writing stream: {}", err);
async fn send_wait_ack(&mut self, frame: Arc<Vec<u8>>) -> Result<CommonAckError> {
let mut stream = self.stream.try_clone()?;
tokio::task::spawn_blocking(move || {
if let Err(e) = stream.write(frame.as_slice()) {
error!("Error writing stream: {}", e);
}
}
})
.await?;

let expected_ack_number =
process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32;
Expand All @@ -159,7 +163,7 @@ impl CanIsoTpMessaging {
/// Receive CAN frames
/// - relay acks to `ack_tx`
/// - relay new McuMessage to `new_message_queue`
async fn can_rx(
fn can_rx(
bus: String,
remote: IsoTpNodeIdentifier,
local: IsoTpNodeIdentifier,
Expand All @@ -170,7 +174,7 @@ async fn can_rx(
let (rx_stdid_src, rx_stdid_dest) = create_pair(remote, local)?;
debug!("Listening on 0x{:x}->0x{:x}", rx_stdid_src, rx_stdid_dest);

let mut rx_isotp_stream = IsotpStream::<8>::build().bind(
let mut rx_isotp_stream = IsotpStream::<CAN_DATA_LEN>::build().bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(rx_stdid_src),
Expand Down Expand Up @@ -256,6 +260,6 @@ impl MessagingInterface for CanIsoTpMessaging {
_ => return Err(eyre!("Invalid payload")),
};

self.send_wait_ack(bytes.as_slice()).await
self.send_wait_ack(Arc::new(bytes)).await
}
}

0 comments on commit fc65538

Please sign in to comment.