Skip to content

Commit

Permalink
properly separate communication handling and packet parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Guggi committed Dec 26, 2023
1 parent 0c4447a commit 7fb2e65
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/command/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl From<CommunicationError> for CommandError {
fn from(e: CommunicationError) -> Self {
match e {
CommunicationError::PacketInvalidError => CommandError::External(Box::new(e)),
CommunicationError::CRCError => CommandError::ProtocolViolation(Box::new(e)),
CommunicationError::CepParsing(_) => CommandError::ProtocolViolation(Box::new(e)),
CommunicationError::Io(_) => CommandError::NonRecoverable(Box::new(e)),
CommunicationError::StopCondition => CommandError::External(Box::new(e)),
CommunicationError::NotAcknowledged => CommandError::ProtocolViolation(Box::new(e)),
Expand Down
55 changes: 48 additions & 7 deletions src/communication/cep.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::io::Read;

use crc::{Crc, CRC_32_MPEG_2};

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -34,20 +36,18 @@ impl CEPPacket {
}

pub fn serialize(self) -> Vec<u8> {
let header = self.header();
match self {
CEPPacket::Ack => vec![0xd7],
CEPPacket::Nack => vec![0x27],
CEPPacket::Stop => vec![0xb4],
CEPPacket::Eof => vec![0x59],
CEPPacket::Data(bytes) => {
let mut v = vec![0x8b];
let mut v = Vec::with_capacity(7 + bytes.len());
let crc32 = CEPPacket::CRC.checksum(&bytes);
v.reserve_exact(6 + bytes.len());
v.push(header);
v.extend((bytes.len() as u16).to_le_bytes());
v.extend(bytes);
v.extend(crc32.to_le_bytes());
v
}
_ => vec![header],
}
}

Expand All @@ -65,6 +65,38 @@ impl CEPPacket {
};
header as u8
}

pub fn try_from_read(reader: &mut (impl Read + ?Sized)) -> Result<Self, CEPParseError> {
let mut header_buffer = [0; 1];
reader.read_exact(&mut header_buffer)?;

let header = CEPPacketHeader::from_repr(header_buffer[0] as usize)
.ok_or(CEPParseError::WrongLength)?;
let packet = match header {
CEPPacketHeader::Ack => CEPPacket::Ack,
CEPPacketHeader::Nack => CEPPacket::Nack,
CEPPacketHeader::Stop => CEPPacket::Stop,
CEPPacketHeader::Eof => CEPPacket::Eof,
CEPPacketHeader::Data => {
let mut length_buffer = [0; 2];
reader.read_exact(&mut length_buffer)?;
let length = u16::from_le_bytes(length_buffer);

let mut data_buffer = vec![0; length as usize];
reader.read_exact(&mut data_buffer)?;

let mut crc_buffer = [0; 4];
reader.read_exact(&mut crc_buffer)?;
if !CEPPacket::crc_is_valid(&data_buffer, u32::from_le_bytes(crc_buffer)) {
return Err(CEPParseError::InvalidCRC);
}

CEPPacket::Data(data_buffer)
}
};

Ok(packet)
}
}

impl From<&CEPPacket> for Vec<u8> {
Expand All @@ -84,11 +116,20 @@ impl From<&CEPPacket> for Vec<u8> {
}
}

#[derive(Debug)]
#[derive(Debug, strum::Display)]
pub enum CEPParseError {
WrongLength,
InvalidHeader,
InvalidCRC,
Io(std::io::Error),
}

impl std::error::Error for CEPParseError {}

impl From<std::io::Error> for CEPParseError {
fn from(value: std::io::Error) -> Self {
CEPParseError::Io(value)
}
}

impl TryFrom<Vec<u8>> for CEPPacket {
Expand Down
Empty file removed src/communication/communication.rs
Empty file.
127 changes: 78 additions & 49 deletions src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,42 @@ use std::{
time::Duration,
};

use self::cep::CEPPacketHeader;
use self::cep::CEPParseError;

pub type ComResult<T> = Result<T, CommunicationError>;

pub trait CommunicationHandle: Read + Write {
const INTEGRITY_ACK_TIMEOUT: Duration;
const UNLIMITED_TIMEOUT: Duration;
const UNLIMITED_TIMEOUT: Duration = Duration::MAX;

const DATA_PACKET_RETRIES: usize = 4;

fn set_timeout(&mut self, timeout: &Duration);

fn send_packet(&mut self, packet: &CEPPacket) -> ComResult<()> {
self.write_all(&[packet.header()])?;
let bytes = Vec::from(packet);
self.write_all(&bytes)?;

if let CEPPacket::Data(data) = packet {
self.write_all(&(data.len() as u16).to_le_bytes())?;
self.write_all(data)?;
self.write_all(&packet.checksum().to_le_bytes())?;
self.flush()?;
if matches!(packet, CEPPacket::Data(_)) {
for _ in 0..Self::DATA_PACKET_RETRIES {
let response = self.receive_packet()?;
match response {
CEPPacket::Ack => return Ok(()),
CEPPacket::Nack => log::warn!("Received NACK after data packet; Retrying"),
p => {
log::error!("Received {p:?} after data packet");
return Err(CommunicationError::PacketInvalidError);
}
}

self.await_ack(&Self::INTEGRITY_ACK_TIMEOUT)?;
self.write_all(&bytes)?;
}
} else {
return Ok(());
}

Ok(())
log::error!("No ACK after {} retries, giving up", Self::DATA_PACKET_RETRIES);
Err(CommunicationError::PacketInvalidError)
}

fn send_multi_packet(&mut self, bytes: &[u8]) -> ComResult<()> {
Expand All @@ -44,37 +57,24 @@ pub trait CommunicationHandle: Read + Write {
}

fn receive_packet(&mut self) -> ComResult<CEPPacket> {
let mut header_buffer = [0; 1];
self.read_exact(&mut header_buffer)?;

let header = CEPPacketHeader::from_repr(header_buffer[0] as usize)
.ok_or(CommunicationError::PacketInvalidError)?;
let packet = match header {
CEPPacketHeader::Ack => CEPPacket::Ack,
CEPPacketHeader::Nack => CEPPacket::Nack,
CEPPacketHeader::Stop => CEPPacket::Stop,
CEPPacketHeader::Eof => CEPPacket::Eof,
CEPPacketHeader::Data => {
let mut length_buffer = [0; 2];
self.read_exact(&mut length_buffer)?;
let length = u16::from_le_bytes(length_buffer);

let mut data_buffer = vec![0; length as usize];
self.read_exact(&mut data_buffer)?;

let mut crc_buffer = [0; 4];
self.read_exact(&mut crc_buffer)?;
if !CEPPacket::crc_is_valid(&data_buffer, u32::from_le_bytes(crc_buffer)) {
return Err(CommunicationError::CRCError);
for _ in 0..Self::DATA_PACKET_RETRIES {
match CEPPacket::try_from_read(self) {
Ok(p @ CEPPacket::Data(_)) => {
self.send_packet(&CEPPacket::Ack)?;
return Ok(p);
}
Ok(p) => return Ok(p),
Err(CEPParseError::InvalidCRC) => {
log::warn!("Received data packet with invalid CRC; Retrying")
}
Err(e) => {
log::error!("Failed to read packet: {e:?}");
return Err(e.into());
}

self.send_packet(&CEPPacket::Ack)?;
CEPPacket::Data(data_buffer)
}
};
}

//self.set_timeout(&Duration::MAX);
Ok(packet)
todo!()
}

fn receive_multi_packet(&mut self, stop_fn: impl Fn() -> bool) -> ComResult<Vec<u8>> {
Expand Down Expand Up @@ -138,8 +138,8 @@ impl CommunicationHandle for Box<dyn serialport::SerialPort> {
pub enum CommunicationError {
/// Signals that an unknown command packet was received
PacketInvalidError,
/// Signals that the CRC checksum of a data packet was wrong
CRCError,
/// Relays an error from trying to parse a CEP packet
CepParsing(CEPParseError),
/// Signals that the underlying sending or receiving failed. Not recoverable on its own.
Io(std::io::Error),
/// Signals that a multi packet receive or send was interrupted by a Stop condition
Expand All @@ -158,18 +158,27 @@ impl std::fmt::Display for CommunicationError {

impl From<std::io::Error> for CommunicationError {
fn from(value: std::io::Error) -> Self {
if value.kind() == std::io::ErrorKind::TimedOut {
CommunicationError::TimedOut
} else {
CommunicationError::Io(value)
match value.kind() {
std::io::ErrorKind::TimedOut => CommunicationError::TimedOut,
std::io::ErrorKind::InvalidData => CommunicationError::PacketInvalidError,
_ => CommunicationError::Io(value),
}
}
}

impl From<CEPParseError> for CommunicationError {
fn from(value: CEPParseError) -> Self {
match value {
CEPParseError::Io(e) => Self::Io(e),
e => Self::CepParsing(e),
}
}
}

impl std::error::Error for CommunicationError {}

#[cfg(test)]
pub mod tests {
mod tests {
use super::*;
use test_case::test_case;

Expand Down Expand Up @@ -206,7 +215,6 @@ pub mod tests {
#[test_case(CEPPacket::Stop)]
#[test_case(CEPPacket::Eof)]
#[test_case(CEPPacket::Data(vec![1, 2, 3]))]

fn packet_is_sent_correctly(packet: CEPPacket) {
let mut com = TestComHandle::default();
com.data_to_read.append(&mut CEPPacket::Ack.serialize());
Expand All @@ -233,11 +241,32 @@ pub mod tests {
}

#[test]
fn error_on_nack() {
fn retry_on_nack() {
let mut com = TestComHandle::default();
com.data_to_read.append(&mut CEPPacket::Nack.serialize());
com.data_to_read.append(&mut CEPPacket::Nack.serialize());
com.data_to_read.append(&mut CEPPacket::Ack.serialize());

com.send_packet(&CEPPacket::Data(vec![1, 2, 3])).unwrap();

let mut expected = CEPPacket::Data(vec![1, 2, 3]).serialize();
expected.extend(CEPPacket::Data(vec![1, 2, 3]).serialize());
expected.extend(CEPPacket::Data(vec![1, 2, 3]).serialize());
assert_eq!(com.written_data, expected);
}

#[test]
fn fail_after_retries() {
let mut com = TestComHandle::default();
for _ in 0..TestComHandle::DATA_PACKET_RETRIES {
com.data_to_read.append(&mut CEPPacket::Nack.serialize());
}

let ret = com.send_packet(&CEPPacket::Data(vec![1, 2, 3])).unwrap_err();
assert!(matches!(ret, CommunicationError::NotAcknowledged));
assert!(
matches!(
com.send_packet(&CEPPacket::Data(vec![1, 2, 3])),
Err(CommunicationError::PacketInvalidError)
)
);
}
}

0 comments on commit 7fb2e65

Please sign in to comment.