Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle embedded_io_async::Read/Write Guarantee #235

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
39 changes: 21 additions & 18 deletions rmk/src/split/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::CONNECTION_STATE;
use crate::{event::KeyEvent, keyboard::KEY_EVENT_CHANNEL};
use defmt::{debug, error, warn};
use embassy_futures::select::select;
use heapless::Vec;

#[derive(Debug, Clone, Copy, defmt::Format)]
pub(crate) enum SplitDriverError {
Expand All @@ -19,7 +20,7 @@ pub(crate) enum SplitDriverError {

/// Split message reader from other split devices
pub(crate) trait SplitReader {
async fn read(&mut self) -> Result<SplitMessage, SplitDriverError>;
async fn read(&mut self) -> Result<Vec<SplitMessage, 2>, SplitDriverError>;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the trait, each read call should read only one SplitMessage, not two. It defines "expected behavior", should not be affected by the serial implementation

}

/// Split message writer to other split devices
Expand Down Expand Up @@ -76,24 +77,26 @@ impl<
loop {
match select(self.receiver.read(), embassy_time::Timer::after_millis(500)).await {
embassy_futures::select::Either::First(read_result) => match read_result {
Ok(received_message) => {
debug!("Received peripheral message: {}", received_message);
if let SplitMessage::Key(e) = received_message {
// Check row/col
if e.row as usize > ROW || e.col as usize > COL {
error!("Invalid peripheral row/col: {} {}", e.row, e.col);
continue;
}
Ok(received_messages) => {
for received_message in received_messages {
debug!("Received peripheral message: {}", received_message);
if let SplitMessage::Key(e) = received_message {
// Check row/col
if e.row as usize > ROW || e.col as usize > COL {
error!("Invalid peripheral row/col: {} {}", e.row, e.col);
continue;
}

if CONNECTION_STATE.load(core::sync::atomic::Ordering::Acquire) {
// Only when the connection is established, send the key event.
KEY_EVENT_CHANNEL
.send(KeyEvent {
row: e.row + ROW_OFFSET as u8,
col: e.col + COL_OFFSET as u8,
pressed: e.pressed,
})
.await;
if CONNECTION_STATE.load(core::sync::atomic::Ordering::Acquire) {
// Only when the connection is established, send the key event.
KEY_EVENT_CHANNEL
.send(KeyEvent {
row: e.row + ROW_OFFSET as u8,
col: e.col + COL_OFFSET as u8,
pressed: e.pressed,
})
.await;
}
} else {
warn!("Key event from peripheral is ignored because the connection is not established.");
}
Expand Down
2 changes: 1 addition & 1 deletion rmk/src/split/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod peripheral;
pub mod serial;

/// Maximum size of a split message
pub const SPLIT_MESSAGE_MAX_SIZE: usize = SplitMessage::POSTCARD_MAX_SIZE + 4;
pub const SPLIT_MESSAGE_MAX_SIZE: usize = SplitMessage::POSTCARD_MAX_SIZE + 6;

/// Message used from central & peripheral communication
#[derive(Serialize, Deserialize, Debug, Clone, Copy, MaxSize, defmt::Format)]
Expand Down
14 changes: 9 additions & 5 deletions rmk/src/split/peripheral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,16 @@ impl<S: SplitWriter + SplitReader> SplitPeripheral<S> {
loop {
match select(self.split_driver.read(), KEY_EVENT_CHANNEL.receive()).await {
embassy_futures::select::Either::First(m) => match m {
// Currently only handle the central state message
Ok(split_message) => match split_message {
SplitMessage::ConnectionState(state) => {
CONNECTION_STATE.store(state, core::sync::atomic::Ordering::Release);
Ok(split_messages) => {
for split_message in split_messages {
// Currently only handle the central state message
match split_message {
SplitMessage::ConnectionState(state) => {
CONNECTION_STATE.store(state, core::sync::atomic::Ordering::Release);
}
_ => (),
}
}
_ => (),
},
Err(e) => {
error!("Split message read error: {:?}", e);
Expand Down
87 changes: 67 additions & 20 deletions rmk/src/split/serial/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
};

use super::driver::SplitDriverError;
use heapless::Vec;

// Receive split message from peripheral via serial and process it
///
Expand Down Expand Up @@ -42,44 +43,90 @@ pub(crate) async fn run_serial_peripheral_monitor<
/// Serial driver for BOTH split central and peripheral
pub(crate) struct SerialSplitDriver<S: Read + Write> {
serial: S,
buffer: [u8; SPLIT_MESSAGE_MAX_SIZE],
n_bytes_part: usize,
}

impl<S: Read + Write> SerialSplitDriver<S> {
pub(crate) fn new(serial: S) -> Self {
Self { serial }
Self {
serial,
buffer: [0_u8; SPLIT_MESSAGE_MAX_SIZE],
n_bytes_part: 0,
}
}
}

impl<S: Read + Write> SplitReader for SerialSplitDriver<S> {
async fn read(&mut self) -> Result<SplitMessage, SplitDriverError> {
let mut buf = [0_u8; SPLIT_MESSAGE_MAX_SIZE];
let n_bytes = self
.serial
.read(&mut buf)
.await
.map_err(|_e| SplitDriverError::SerialError)?;
if n_bytes == 0 {
return Err(SplitDriverError::EmptyMessage);
async fn read(&mut self) -> Result<Vec<SplitMessage, 2>, SplitDriverError> {
const SENTINEL: u8 = 0x00;
let mut messages = Vec::new();
while self.n_bytes_part < self.buffer.len() {
let n_bytes = self
.serial
.read(&mut self.buffer[self.n_bytes_part..])
.await
.inspect_err(|_e| self.n_bytes_part = 0)
.map_err(|_e| SplitDriverError::SerialError)?;
if n_bytes == 0 {
return Err(SplitDriverError::EmptyMessage);
}

self.n_bytes_part += n_bytes;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if n_bytes_part > self.buffer.len()

if self.buffer[..self.n_bytes_part].contains(&SENTINEL) {
break;
}
}
let message: SplitMessage = postcard::from_bytes(&buf).map_err(|e| {
error!("Postcard deserialize split message error: {}", e);
SplitDriverError::DeserializeError
})?;
Ok(message)

let mut start_byte = 0;
let mut end_byte = start_byte;
while end_byte < self.n_bytes_part {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer size is SPLIT_MESSAGE_MAX_SIZE, so this second loop is not needed -- we can assume that after the first while loop, there's a complete SplitMessage received.

let value = self.buffer[end_byte];
if value == SENTINEL {
postcard::from_bytes_cobs(&mut self.buffer[start_byte..=end_byte]).map_or_else(
|e| error!("Postcard deserialize split message error: {}", e),
|message| {
messages
.push(message)
.unwrap_or_else(|_m| error!("Split message vector full"));
},
);
start_byte = end_byte + 1;
}

end_byte += 1;
}

if start_byte != self.n_bytes_part {
// Store Partial Message for Next Read
self.buffer.copy_within(start_byte..self.n_bytes_part, 0);
self.n_bytes_part = self.n_bytes_part - start_byte;
} else {
// Reset Buffer
self.n_bytes_part = 0;
}

Ok(messages)
}
}

impl<S: Read + Write> SplitWriter for SerialSplitDriver<S> {
async fn write(&mut self, message: &SplitMessage) -> Result<usize, SplitDriverError> {
let mut buf = [0_u8; SPLIT_MESSAGE_MAX_SIZE];
let bytes = postcard::to_slice(message, &mut buf).map_err(|e| {
let bytes = postcard::to_slice_cobs(message, &mut buf).map_err(|e| {
error!("Postcard serialize split message error: {}", e);
SplitDriverError::SerializeError
})?;
self.serial
.write(bytes)
.await
.map_err(|_e| SplitDriverError::SerialError)
let mut remaining_bytes = bytes.len();
while remaining_bytes > 0 {
let sent_bytes = self
.serial
.write(&bytes[bytes.len() - remaining_bytes..])
.await
.map_err(|_e| SplitDriverError::SerialError)?;
remaining_bytes -= sent_bytes;
}
Ok(bytes.len())
}
}

Expand Down