From 7fb2e65b9b1a03ce2b6a490bfa010546977b612e Mon Sep 17 00:00:00 2001 From: Florian Guggi Date: Tue, 26 Dec 2023 20:10:17 +0100 Subject: [PATCH] properly separate communication handling and packet parsing --- src/command/error.rs | 2 +- src/communication/cep.rs | 55 +++++++++++-- src/communication/communication.rs | 0 src/communication/mod.rs | 127 ++++++++++++++++++----------- 4 files changed, 127 insertions(+), 57 deletions(-) delete mode 100644 src/communication/communication.rs diff --git a/src/command/error.rs b/src/command/error.rs index 566f7a9..74b2085 100644 --- a/src/command/error.rs +++ b/src/command/error.rs @@ -25,7 +25,7 @@ impl From for CommandError { fn from(e: CommunicationError) -> Self { match e { CommunicationError::PacketInvalidError => CommandError::External(Box::new(e)), - CommunicationError::CRCError => CommandError::ProtocolViolation(Box::new(e)), + CommunicationError::CepParsing(_) => CommandError::ProtocolViolation(Box::new(e)), CommunicationError::Io(_) => CommandError::NonRecoverable(Box::new(e)), CommunicationError::StopCondition => CommandError::External(Box::new(e)), CommunicationError::NotAcknowledged => CommandError::ProtocolViolation(Box::new(e)), diff --git a/src/communication/cep.rs b/src/communication/cep.rs index c4ec597..55e23b4 100644 --- a/src/communication/cep.rs +++ b/src/communication/cep.rs @@ -1,3 +1,5 @@ +use std::io::Read; + use crc::{Crc, CRC_32_MPEG_2}; #[derive(Debug, Clone, PartialEq, Eq)] @@ -34,20 +36,18 @@ impl CEPPacket { } pub fn serialize(self) -> Vec { + let header = self.header(); match self { - CEPPacket::Ack => vec![0xd7], - CEPPacket::Nack => vec![0x27], - CEPPacket::Stop => vec![0xb4], - CEPPacket::Eof => vec![0x59], CEPPacket::Data(bytes) => { - let mut v = vec![0x8b]; + let mut v = Vec::with_capacity(7 + bytes.len()); let crc32 = CEPPacket::CRC.checksum(&bytes); - v.reserve_exact(6 + bytes.len()); + v.push(header); v.extend((bytes.len() as u16).to_le_bytes()); v.extend(bytes); v.extend(crc32.to_le_bytes()); v } + _ => vec![header], } } @@ -65,6 +65,38 @@ impl CEPPacket { }; header as u8 } + + pub fn try_from_read(reader: &mut (impl Read + ?Sized)) -> Result { + let mut header_buffer = [0; 1]; + reader.read_exact(&mut header_buffer)?; + + let header = CEPPacketHeader::from_repr(header_buffer[0] as usize) + .ok_or(CEPParseError::WrongLength)?; + let packet = match header { + CEPPacketHeader::Ack => CEPPacket::Ack, + CEPPacketHeader::Nack => CEPPacket::Nack, + CEPPacketHeader::Stop => CEPPacket::Stop, + CEPPacketHeader::Eof => CEPPacket::Eof, + CEPPacketHeader::Data => { + let mut length_buffer = [0; 2]; + reader.read_exact(&mut length_buffer)?; + let length = u16::from_le_bytes(length_buffer); + + let mut data_buffer = vec![0; length as usize]; + reader.read_exact(&mut data_buffer)?; + + let mut crc_buffer = [0; 4]; + reader.read_exact(&mut crc_buffer)?; + if !CEPPacket::crc_is_valid(&data_buffer, u32::from_le_bytes(crc_buffer)) { + return Err(CEPParseError::InvalidCRC); + } + + CEPPacket::Data(data_buffer) + } + }; + + Ok(packet) + } } impl From<&CEPPacket> for Vec { @@ -84,11 +116,20 @@ impl From<&CEPPacket> for Vec { } } -#[derive(Debug)] +#[derive(Debug, strum::Display)] pub enum CEPParseError { WrongLength, InvalidHeader, InvalidCRC, + Io(std::io::Error), +} + +impl std::error::Error for CEPParseError {} + +impl From for CEPParseError { + fn from(value: std::io::Error) -> Self { + CEPParseError::Io(value) + } } impl TryFrom> for CEPPacket { diff --git a/src/communication/communication.rs b/src/communication/communication.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/communication/mod.rs b/src/communication/mod.rs index 8bd501c..096df87 100644 --- a/src/communication/mod.rs +++ b/src/communication/mod.rs @@ -6,29 +6,42 @@ use std::{ time::Duration, }; -use self::cep::CEPPacketHeader; +use self::cep::CEPParseError; pub type ComResult = Result; pub trait CommunicationHandle: Read + Write { const INTEGRITY_ACK_TIMEOUT: Duration; - const UNLIMITED_TIMEOUT: Duration; + const UNLIMITED_TIMEOUT: Duration = Duration::MAX; + + const DATA_PACKET_RETRIES: usize = 4; fn set_timeout(&mut self, timeout: &Duration); fn send_packet(&mut self, packet: &CEPPacket) -> ComResult<()> { - self.write_all(&[packet.header()])?; + let bytes = Vec::from(packet); + self.write_all(&bytes)?; - if let CEPPacket::Data(data) = packet { - self.write_all(&(data.len() as u16).to_le_bytes())?; - self.write_all(data)?; - self.write_all(&packet.checksum().to_le_bytes())?; - self.flush()?; + if matches!(packet, CEPPacket::Data(_)) { + for _ in 0..Self::DATA_PACKET_RETRIES { + let response = self.receive_packet()?; + match response { + CEPPacket::Ack => return Ok(()), + CEPPacket::Nack => log::warn!("Received NACK after data packet; Retrying"), + p => { + log::error!("Received {p:?} after data packet"); + return Err(CommunicationError::PacketInvalidError); + } + } - self.await_ack(&Self::INTEGRITY_ACK_TIMEOUT)?; + self.write_all(&bytes)?; + } + } else { + return Ok(()); } - Ok(()) + log::error!("No ACK after {} retries, giving up", Self::DATA_PACKET_RETRIES); + Err(CommunicationError::PacketInvalidError) } fn send_multi_packet(&mut self, bytes: &[u8]) -> ComResult<()> { @@ -44,37 +57,24 @@ pub trait CommunicationHandle: Read + Write { } fn receive_packet(&mut self) -> ComResult { - let mut header_buffer = [0; 1]; - self.read_exact(&mut header_buffer)?; - - let header = CEPPacketHeader::from_repr(header_buffer[0] as usize) - .ok_or(CommunicationError::PacketInvalidError)?; - let packet = match header { - CEPPacketHeader::Ack => CEPPacket::Ack, - CEPPacketHeader::Nack => CEPPacket::Nack, - CEPPacketHeader::Stop => CEPPacket::Stop, - CEPPacketHeader::Eof => CEPPacket::Eof, - CEPPacketHeader::Data => { - let mut length_buffer = [0; 2]; - self.read_exact(&mut length_buffer)?; - let length = u16::from_le_bytes(length_buffer); - - let mut data_buffer = vec![0; length as usize]; - self.read_exact(&mut data_buffer)?; - - let mut crc_buffer = [0; 4]; - self.read_exact(&mut crc_buffer)?; - if !CEPPacket::crc_is_valid(&data_buffer, u32::from_le_bytes(crc_buffer)) { - return Err(CommunicationError::CRCError); + for _ in 0..Self::DATA_PACKET_RETRIES { + match CEPPacket::try_from_read(self) { + Ok(p @ CEPPacket::Data(_)) => { + self.send_packet(&CEPPacket::Ack)?; + return Ok(p); + } + Ok(p) => return Ok(p), + Err(CEPParseError::InvalidCRC) => { + log::warn!("Received data packet with invalid CRC; Retrying") + } + Err(e) => { + log::error!("Failed to read packet: {e:?}"); + return Err(e.into()); } - - self.send_packet(&CEPPacket::Ack)?; - CEPPacket::Data(data_buffer) } - }; + } - //self.set_timeout(&Duration::MAX); - Ok(packet) + todo!() } fn receive_multi_packet(&mut self, stop_fn: impl Fn() -> bool) -> ComResult> { @@ -138,8 +138,8 @@ impl CommunicationHandle for Box { pub enum CommunicationError { /// Signals that an unknown command packet was received PacketInvalidError, - /// Signals that the CRC checksum of a data packet was wrong - CRCError, + /// Relays an error from trying to parse a CEP packet + CepParsing(CEPParseError), /// Signals that the underlying sending or receiving failed. Not recoverable on its own. Io(std::io::Error), /// Signals that a multi packet receive or send was interrupted by a Stop condition @@ -158,10 +158,19 @@ impl std::fmt::Display for CommunicationError { impl From for CommunicationError { fn from(value: std::io::Error) -> Self { - if value.kind() == std::io::ErrorKind::TimedOut { - CommunicationError::TimedOut - } else { - CommunicationError::Io(value) + match value.kind() { + std::io::ErrorKind::TimedOut => CommunicationError::TimedOut, + std::io::ErrorKind::InvalidData => CommunicationError::PacketInvalidError, + _ => CommunicationError::Io(value), + } + } +} + +impl From for CommunicationError { + fn from(value: CEPParseError) -> Self { + match value { + CEPParseError::Io(e) => Self::Io(e), + e => Self::CepParsing(e), } } } @@ -169,7 +178,7 @@ impl From for CommunicationError { impl std::error::Error for CommunicationError {} #[cfg(test)] -pub mod tests { +mod tests { use super::*; use test_case::test_case; @@ -206,7 +215,6 @@ pub mod tests { #[test_case(CEPPacket::Stop)] #[test_case(CEPPacket::Eof)] #[test_case(CEPPacket::Data(vec![1, 2, 3]))] - fn packet_is_sent_correctly(packet: CEPPacket) { let mut com = TestComHandle::default(); com.data_to_read.append(&mut CEPPacket::Ack.serialize()); @@ -233,11 +241,32 @@ pub mod tests { } #[test] - fn error_on_nack() { + fn retry_on_nack() { let mut com = TestComHandle::default(); com.data_to_read.append(&mut CEPPacket::Nack.serialize()); + com.data_to_read.append(&mut CEPPacket::Nack.serialize()); + com.data_to_read.append(&mut CEPPacket::Ack.serialize()); + + com.send_packet(&CEPPacket::Data(vec![1, 2, 3])).unwrap(); + + let mut expected = CEPPacket::Data(vec![1, 2, 3]).serialize(); + expected.extend(CEPPacket::Data(vec![1, 2, 3]).serialize()); + expected.extend(CEPPacket::Data(vec![1, 2, 3]).serialize()); + assert_eq!(com.written_data, expected); + } + + #[test] + fn fail_after_retries() { + let mut com = TestComHandle::default(); + for _ in 0..TestComHandle::DATA_PACKET_RETRIES { + com.data_to_read.append(&mut CEPPacket::Nack.serialize()); + } - let ret = com.send_packet(&CEPPacket::Data(vec![1, 2, 3])).unwrap_err(); - assert!(matches!(ret, CommunicationError::NotAcknowledged)); + assert!( + matches!( + com.send_packet(&CEPPacket::Data(vec![1, 2, 3])), + Err(CommunicationError::PacketInvalidError) + ) + ); } }