Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcu-util: blocking call to can-rs in async #85

Merged
merged 10 commits into from
May 7, 2024
56 changes: 25 additions & 31 deletions mcu-util/src/messaging/can/canfd.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::mpsc;

use async_trait::async_trait;
use can_rs::filter::Filter;
use can_rs::stream::FrameStream;
use can_rs::{Frame, Id, CANFD_DATA_LEN};
use eyre::{eyre, Context, Result};
use orb_messages::CommonAckError;
use prost::Message;
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{mpsc, Arc};
use tokio::time::Duration;
use tracing::debug;

use can_rs::filter::Filter;
use can_rs::stream::FrameStream;
use can_rs::{Frame, Id};

use crate::messaging::Device::{JetsonFromMain, JetsonFromSecurity, Main, Security};
use crate::messaging::{
handle_main_mcu_message, handle_sec_mcu_message, Device, McuPayload,
MessagingInterface,
};

pub struct CanRawMessaging {
stream: FrameStream<64>,
stream: FrameStream<CANFD_DATA_LEN>,
ack_num_lsb: AtomicU16,
ack_queue: mpsc::Receiver<(CommonAckError, u32)>,
can_node: Device,
Expand All @@ -35,7 +33,7 @@ impl CanRawMessaging {
new_message_queue: mpsc::Sender<McuPayload>,
) -> Result<Self> {
// open socket
let stream = FrameStream::<64>::build()
let stream = FrameStream::<CANFD_DATA_LEN>::build()
.nonblocking(false)
.filters(vec![
Filter {
Expand All @@ -47,13 +45,14 @@ impl CanRawMessaging {
mask: 0xff,
},
])
.bind(bus.as_str().parse().unwrap())?;
.bind(bus.as_str().parse().unwrap())
.wrap_err("Failed to bind CAN stream")?;

let (ack_tx, ack_rx) = mpsc::channel();

// spawn CAN receiver
let stream_copy = stream.try_clone()?;
tokio::task::spawn(can_rx(stream_copy, can_node, ack_tx, new_message_queue));
tokio::task::spawn_blocking(move || {
can_rx(stream_copy, can_node, ack_tx, new_message_queue)
});

Ok(Self {
stream,
Expand All @@ -78,9 +77,12 @@ impl CanRawMessaging {
}
}

#[allow(dead_code)]
async fn send_wait_ack(&mut self, frame: &Frame<64>) -> Result<CommonAckError> {
self.stream.send(frame, 0)?;
async fn send_wait_ack(
&mut self,
frame: Arc<Frame<CANFD_DATA_LEN>>,
) -> Result<CommonAckError> {
let stream = self.stream.try_clone()?;
tokio::task::spawn_blocking(move || stream.send(&frame, 0)).await??;

// put some randomness into ack number to prevent collision with other processes
let expected_ack_number =
Expand All @@ -94,15 +96,14 @@ impl CanRawMessaging {
/// Receive CAN frames
/// - relay acks to `ack_tx`
/// - relay new McuMessage to `new_message_queue`
#[allow(dead_code)]
async fn can_rx(
stream: FrameStream<64>,
fn can_rx(
stream: FrameStream<CANFD_DATA_LEN>,
remote_node: Device,
ack_tx: mpsc::Sender<(CommonAckError, u32)>,
new_message_queue: mpsc::Sender<McuPayload>,
) -> Result<()> {
loop {
let mut frame: Frame<64> = Frame::empty();
let mut frame: Frame<CANFD_DATA_LEN> = Frame::empty();
loop {
match stream.recv(&mut frame, 0) {
Ok(_) => {
Expand Down Expand Up @@ -146,7 +147,6 @@ async fn can_rx(
#[async_trait]
impl MessagingInterface for CanRawMessaging {
/// Send payload into McuMessage
#[allow(dead_code)]
async fn send(&mut self, payload: McuPayload) -> Result<CommonAckError> {
// snowflake ack ID to avoid collisions:
// prefix ack number with process ID
Expand Down Expand Up @@ -201,26 +201,20 @@ impl MessagingInterface for CanRawMessaging {
};

if let Some(bytes) = bytes {
let mut buf: [u8; 64] = [0u8; 64];
let mut buf: [u8; CANFD_DATA_LEN] = [0u8; CANFD_DATA_LEN];
buf[..bytes.len()].copy_from_slice(bytes.as_slice());
fouge marked this conversation as resolved.
Show resolved Hide resolved

let node_addr = self.can_node as u32;
let frame = Frame {
id: Id::Extended(node_addr),
len: 64,
len: CANFD_DATA_LEN as u8,
flags: 0x0F,
TheButlah marked this conversation as resolved.
Show resolved Hide resolved
data: buf,
};

self.send_wait_ack(&frame).await
self.send_wait_ack(Arc::new(frame)).await
} else {
Err(eyre!("Failed to encode payload"))
}
}
}

impl Drop for CanRawMessaging {
fn drop(&mut self) {
// TODO
}
}
58 changes: 31 additions & 27 deletions mcu-util/src/messaging/can/isotp.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::io::{Read, Write};
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::mpsc;

use async_trait::async_trait;
use eyre::{eyre, Context, Result};
use orb_messages::CommonAckError;
use prost::Message;
use std::io::{Read, Write};
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{mpsc, Arc};
use tokio::time::Duration;
use tracing::{debug, error};

use can_rs::isotp::addr::CanIsotpAddr;
use can_rs::isotp::stream::IsotpStream;
use can_rs::Id;
use can_rs::{Id, CAN_DATA_LEN};

use crate::messaging::{
handle_main_mcu_message, handle_sec_mcu_message, McuPayload, MessagingInterface,
Expand Down Expand Up @@ -69,7 +68,7 @@ impl From<u8> for IsoTpNodeIdentifier {
}

pub struct CanIsoTpMessaging {
tx_stream: IsotpStream<8>,
stream: IsotpStream<CAN_DATA_LEN>,
ack_num_lsb: AtomicU16,
ack_queue: mpsc::Receiver<(CommonAckError, u32)>,
}
Expand Down Expand Up @@ -103,23 +102,27 @@ impl CanIsoTpMessaging {
let (tx_stdid_src, tx_stdid_dst) = create_pair(local, remote)?;
debug!("Sending on 0x{:x}->0x{:x}", tx_stdid_src, tx_stdid_dst);

let (ack_tx, ack_rx) = mpsc::channel();

// open TX stream
let tx_isotp_stream = IsotpStream::<8>::build().bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(tx_stdid_dst),
Id::Standard(tx_stdid_src),
let tx_isotp_stream = IsotpStream::<CAN_DATA_LEN>::build()
.bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(tx_stdid_dst),
Id::Standard(tx_stdid_src),
)
.expect("Unable to build IsoTpStream"),
)
.expect("Unable to build IsoTpStream"),
)?;
.wrap_err("Failed to bind CAN ISO-TP stream")?;

let (ack_tx, ack_rx) = mpsc::channel();

// spawn CAN receiver
tokio::task::spawn(can_rx(bus, remote, local, ack_tx, new_message_queue));
tokio::task::spawn_blocking(move || {
can_rx(bus, remote, local, ack_tx, new_message_queue)
});

Ok(CanIsoTpMessaging {
tx_stream: tx_isotp_stream,
stream: tx_isotp_stream,
ack_num_lsb: AtomicU16::new(0),
ack_queue: ack_rx,
})
Expand All @@ -140,13 +143,14 @@ impl CanIsoTpMessaging {
}
}

async fn send_wait_ack(&mut self, frame: &[u8]) -> Result<CommonAckError> {
match self.tx_stream.write(frame) {
Ok(_) => {}
Err(err) => {
error!("Error writing stream: {}", err);
async fn send_wait_ack(&mut self, frame: Arc<Vec<u8>>) -> Result<CommonAckError> {
let mut stream = self.stream.try_clone()?;
tokio::task::spawn_blocking(move || {
if let Err(e) = stream.write(frame.as_slice()) {
error!("Error writing stream: {}", e);
}
}
})
.await?;

let expected_ack_number =
process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32;
Expand All @@ -159,7 +163,7 @@ impl CanIsoTpMessaging {
/// Receive CAN frames
/// - relay acks to `ack_tx`
/// - relay new McuMessage to `new_message_queue`
async fn can_rx(
fn can_rx(
bus: String,
remote: IsoTpNodeIdentifier,
local: IsoTpNodeIdentifier,
Expand All @@ -170,7 +174,7 @@ async fn can_rx(
let (rx_stdid_src, rx_stdid_dest) = create_pair(remote, local)?;
debug!("Listening on 0x{:x}->0x{:x}", rx_stdid_src, rx_stdid_dest);

let mut rx_isotp_stream = IsotpStream::<8>::build().bind(
let mut rx_isotp_stream = IsotpStream::<CAN_DATA_LEN>::build().bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(rx_stdid_src),
Expand Down Expand Up @@ -256,6 +260,6 @@ impl MessagingInterface for CanIsoTpMessaging {
_ => return Err(eyre!("Invalid payload")),
};

self.send_wait_ack(bytes.as_slice()).await
self.send_wait_ack(Arc::new(bytes)).await
}
TheButlah marked this conversation as resolved.
Show resolved Hide resolved
}
Loading