Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle att mtu even if gatt is not enabled #46

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 13 additions & 29 deletions host/src/gatt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bt_hci::param::ConnHandle;
use embassy_sync::blocking_mutex::raw::RawMutex;
use embassy_sync::channel::DynamicReceiver;

use crate::att::{self, Att, ATT_HANDLE_VALUE_NTF_OPTCODE};
use crate::att::{Att, ATT_HANDLE_VALUE_NTF_OPTCODE};
use crate::attribute::CharacteristicHandle;
use crate::attribute_server::AttributeServer;
use crate::connection::Connection;
Expand Down Expand Up @@ -39,13 +39,12 @@ impl<'reference, 'values, 'resources, M: RawMutex, T: Controller, const MAX: usi
let mut w = WriteCursor::new(response.as_mut());
let (mut header, mut data) = w.split(4)?;

match att {
Att::ExchangeMtu { mtu } => {
let mtu = self.connections.exchange_att_mtu(handle, mtu);
data.write(att::ATT_EXCHANGE_MTU_RESPONSE_OPCODE)?;
data.write(mtu)?;

header.write(data.len() as u16)?;
match self.server.process(handle, att, data.write_buf()) {
Ok(Some(written)) => {
let mtu = self.connections.get_att_mtu(handle);
data.commit(written)?;
data.truncate(mtu as usize);
header.write(written as u16)?;
header.write(4_u16)?;
let len = header.len() + data.len();
self.tx
Expand All @@ -54,27 +53,12 @@ impl<'reference, 'values, 'resources, M: RawMutex, T: Controller, const MAX: usi
.send(Pdu::new(response, len).as_ref())
.await?;
}
_ => match self.server.process(handle, att, data.write_buf()) {
Ok(Some(written)) => {
let mtu = self.connections.get_att_mtu(handle);
data.commit(written)?;
data.truncate(mtu as usize);
header.write(written as u16)?;
header.write(4_u16)?;
let len = header.len() + data.len();
self.tx
.acl(handle, 1)
.await?
.send(Pdu::new(response, len).as_ref())
.await?;
}
Ok(None) => {
debug!("No response sent");
}
Err(e) => {
warn!("Error processing attribute: {:?}", e);
}
},
Ok(None) => {
debug!("No response sent");
}
Err(e) => {
warn!("Error processing attribute: {:?}", e);
}
}
}
Err(e) => {
Expand Down
83 changes: 68 additions & 15 deletions host/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use bt_hci::param::{
FilterDuplicates, InitiatingPhy, LeEventMask, Operation, PhyParams, ScanningPhy,
};
use bt_hci::{ControllerToHostPacket, FromHciBytes, WriteHci};
use embassy_futures::select::{select, Either};
use embassy_futures::select::{select, select3, Either, Either3};
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_sync::channel::{Channel, TryReceiveError};
use embassy_sync::once_lock::OnceLock;
Expand All @@ -33,7 +33,7 @@ use futures::pin_mut;
use crate::advertise::{Advertisement, AdvertisementParameters, AdvertisementSet, RawAdvertisement};
use crate::channel_manager::{ChannelManager, ChannelStorage, PacketChannel};
use crate::connection::{ConnectConfig, Connection};
use crate::connection_manager::{ConnectionManager, ConnectionStorage, PacketGrant};
use crate::connection_manager::{ConnectionManager, ConnectionStorage, DynamicConnectionManager, PacketGrant};
use crate::cursor::WriteCursor;
use crate::l2cap::sar::{PacketReassembly, SarType, EMPTY_SAR};
use crate::packet_pool::{AllocId, GlobalPacketPool, PacketPool, Qos};
Expand All @@ -42,9 +42,9 @@ use crate::scan::{PhySet, ScanConfig, ScanReport};
use crate::types::l2cap::{
L2capHeader, L2capSignal, L2capSignalHeader, L2CAP_CID_ATT, L2CAP_CID_DYN_START, L2CAP_CID_LE_U_SIGNAL,
};
use crate::{att, Address, BleHostError, Error};
#[cfg(feature = "gatt")]
use crate::{attribute::AttributeTable, gatt::GattServer};
use crate::{Address, BleHostError, Error};

const L2CAP_RXQ: usize = 1;

Expand Down Expand Up @@ -97,6 +97,7 @@ pub struct BleHost<'d, T> {
pub(crate) channels: ChannelManager<'d, L2CAP_RXQ>,
pub(crate) att_inbound: Channel<NoopRawMutex, (ConnHandle, Pdu), 1>,
pub(crate) pool: &'static dyn GlobalPacketPool,
outbound: Channel<NoopRawMutex, (ConnHandle, Pdu), 1>,

pub(crate) scanner: Channel<NoopRawMutex, Option<ScanReport>, 1>,
advertiser_terminations: Channel<NoopRawMutex, AdvHandle, 1>,
Expand Down Expand Up @@ -138,6 +139,7 @@ where
att_inbound: Channel::new(),
scanner: Channel::new(),
advertiser_terminations: Channel::new(),
outbound: Channel::new(),
}
}

Expand Down Expand Up @@ -601,10 +603,18 @@ where
}

async fn handle_acl(&self, acl: AclPacket<'_>) -> Result<(), Error> {
let (header, packet) = match acl.boundary_flag() {
let (header, mut packet) = match acl.boundary_flag() {
AclPacketBoundary::FirstFlushable => {
let (header, data) = L2capHeader::from_hci_bytes(acl.data())?;

// Ignore channels we don't support
if header.channel < L2CAP_CID_DYN_START
&& !(&[L2CAP_CID_LE_U_SIGNAL, L2CAP_CID_ATT].contains(&header.channel))
{
warn!("[host] unsupported l2cap channel id {}", header.channel);
return Ok(());
}

// Avoids using the packet buffer for signalling packets
if header.channel == L2CAP_CID_LE_U_SIGNAL {
assert!(data.len() == header.length as usize);
Expand All @@ -614,7 +624,7 @@ where

let Some(mut p) = self.pool.alloc(AllocId::from_channel(header.channel)) else {
info!("No memory for packets on channel {}", header.channel);
return Err(Error::OutOfMemory);
return Err(Error::OutOfMemory.into());
};
p.as_mut()[..data.len()].copy_from_slice(data);

Expand All @@ -636,18 +646,40 @@ where
}
other => {
warn!("Unexpected boundary flag: {:?}!", other);
return Err(Error::NotSupported);
return Err(Error::NotSupported.into());
}
};

match header.channel {
L2CAP_CID_ATT => {
#[cfg(feature = "gatt")]
self.att_inbound
.send((acl.handle(), Pdu::new(packet, header.length as usize)))
.await;
#[cfg(not(feature = "gatt"))]
return Err(Error::NotSupported);
// Handle ATT MTU exchange here since it doesn't strictly require
// gatt to be enabled.
if let att::Att::ExchangeMtu { mtu } = att::Att::decode(&packet.as_ref()[..header.length as usize])? {
let mut w = WriteCursor::new(packet.as_mut());

let mtu = self.connections.exchange_att_mtu(acl.handle(), mtu);

let l2cap = L2capHeader {
channel: L2CAP_CID_ATT,
length: 3,
};

w.write_hci(&l2cap)?;
w.write(att::ATT_EXCHANGE_MTU_RESPONSE_OPCODE)?;
w.write(mtu)?;
let len = w.len();
w.finish();

self.outbound.send((acl.handle(), Pdu::new(packet, len))).await;
} else {
#[cfg(feature = "gatt")]
self.att_inbound
.send((acl.handle(), Pdu::new(packet, header.length as usize)))
.await;

#[cfg(not(feature = "gatt"))]
return Err(Error::NotSupported.into());
}
}
L2CAP_CID_LE_U_SIGNAL => {
panic!("le signalling channel was fragmented, impossible!");
Expand Down Expand Up @@ -753,6 +785,25 @@ where
};
pin_mut!(control_fut);

let tx_fut = async {
loop {
let (conn, pdu) = self.outbound.receive().await;
match self.acl(conn, 1).await {
Ok(mut sender) => {
if let Err(e) = sender.send(pdu.as_ref()).await {
warn!("[host] error sending outbound pdu");
return Err(e);
}
}
Err(e) => {
warn!("[host] error requesting sending outbound pdu");
return Err(e);
}
}
}
};
pin_mut!(tx_fut);

loop {
// Task handling receiving data from the controller.
let rx_fut = async {
Expand Down Expand Up @@ -848,9 +899,11 @@ where
};

// info!("Entering select loop");
let result: Result<(), BleHostError<T::Error>> = match select(&mut control_fut, rx_fut).await {
Either::First(result) => result,
Either::Second(result) => result,
let result: Result<(), BleHostError<T::Error>> = match select3(&mut control_fut, rx_fut, &mut tx_fut).await
{
Either3::First(result) => result,
Either3::Second(result) => result,
Either3::Third(result) => result,
};
result?;
}
Expand Down
7 changes: 7 additions & 0 deletions host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub enum BleHostError<E> {
pub enum Error {
HciEncode(bt_hci::param::Error),
HciDecode(FromHciBytesError),
Att(att::AttDecodeError),
InsufficientSpace,
InvalidValue,
Advertisement(AdvertisementDataError),
Expand All @@ -95,6 +96,12 @@ impl<E> From<Error> for BleHostError<E> {
}
}

impl From<att::AttDecodeError> for Error {
fn from(error: att::AttDecodeError) -> Self {
Self::Att(error)
}
}

impl From<FromHciBytesError> for Error {
fn from(error: FromHciBytesError) -> Self {
Self::HciDecode(error)
Expand Down
12 changes: 3 additions & 9 deletions host/src/packet_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use embassy_sync::blocking_mutex::Mutex;
use crate::types::l2cap::{L2CAP_CID_ATT, L2CAP_CID_DYN_START};

// Generic client ID used by ATT PDU
#[cfg(feature = "gatt")]
pub(crate) const ATT_ID: AllocId = AllocId(0);

#[derive(Clone, Copy, Debug)]
Expand All @@ -16,22 +15,18 @@ pub struct AllocId(usize);
impl AllocId {
pub fn dynamic(idx: usize) -> AllocId {
// Dynamic range starts at 2
#[cfg(feature = "gatt")]
return AllocId(1 + idx);
#[cfg(not(feature = "gatt"))]
return AllocId(idx);
}

pub fn from_channel(cid: u16) -> AllocId {
match cid {
L2CAP_CID_ATT => {
#[cfg(feature = "gatt")]
return ATT_ID;
#[cfg(not(feature = "gatt"))]
panic!("gatt feature must be enabled to support gatt");
}
cid if cid >= L2CAP_CID_DYN_START => Self::dynamic((cid - L2CAP_CID_DYN_START) as usize),
_ => unimplemented!(),
cid => {
panic!("unexpected channel id {}", cid);
}
}
}
}
Expand Down Expand Up @@ -147,7 +142,6 @@ pub struct PacketPool<M: RawMutex, const MTU: usize, const N: usize, const CLIEN
impl<M: RawMutex, const MTU: usize, const N: usize, const CLIENTS: usize> PacketPool<M, MTU, N, CLIENTS> {
pub fn new(qos: Qos) -> Self {
// Need at least 1 for gatt
#[cfg(feature = "gatt")]
assert!(CLIENTS >= 1);
Self {
state: Mutex::new(State::new()),
Expand Down