Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propose Session trait and remove all the logic below the application-layer #20

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ maintenance = { status = "actively-developed" }

[dependencies]
embedded-hal = { version = "1.0.0-alpha.4" }
embedded-nal = "0.2.0"
nb = "^1"
heapless = { version = "^0.5.5", features = ["serde"] }
no-std-net = "0.4.0"
embedded-nal = { git = "https://github.com/BlackbirdHQ/embedded-nal", branch = "factbird-mini-1.0" }
mqttrs = { version = "0.4.0", default-features = false }

defmt = { version = "^0.1" }
Expand Down
57 changes: 13 additions & 44 deletions examples/common/network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use embedded_nal::{AddrType, Dns, Mode, SocketAddr, TcpStack};
use embedded_nal::{AddrType, Dns, SocketAddr, TcpClient};
use heapless::{consts, String};
use no_std_net::IpAddr;
use std::io::{ErrorKind, Read, Write};
Expand All @@ -10,12 +10,11 @@ pub struct Network;

pub struct TcpSocket {
pub stream: Option<TcpStream>,
mode: Mode,
}

impl TcpSocket {
pub fn new(mode: Mode) -> Self {
TcpSocket { stream: None, mode }
pub fn new() -> Self {
TcpSocket { stream: None }
}
}

Expand All @@ -33,24 +32,15 @@ impl Dns for Network {
}
}

impl TcpStack for Network {
impl TcpClient for Network {
type Error = ();
type TcpSocket = TcpSocket;

fn open(&self, mode: Mode) -> Result<Self::TcpSocket, Self::Error> {
Ok(TcpSocket::new(mode))
fn socket(&self) -> Result<Self::TcpSocket, Self::Error> {
Ok(TcpSocket::new())
}

fn read_with<F>(&self, network: &mut Self::TcpSocket, f: F) -> nb::Result<usize, Self::Error>
where
F: FnOnce(&[u8], Option<&[u8]>) -> usize,
{
let buf = &mut [0u8; 512];
let len = self.read(network, buf)?;
Ok(f(&mut buf[..len], None))
}

fn read(
fn receive(
&self,
network: &mut Self::TcpSocket,
buf: &mut [u8],
Expand All @@ -65,7 +55,7 @@ impl TcpStack for Network {
}
}

fn write(
fn send(
&self,
network: &mut Self::TcpSocket,
buf: &[u8],
Expand All @@ -83,33 +73,12 @@ impl TcpStack for Network {

fn connect(
&self,
network: Self::TcpSocket,
network: &mut Self::TcpSocket,
remote: SocketAddr,
) -> Result<Self::TcpSocket, Self::Error> {
Ok(match TcpStream::connect(format!("{}", remote)) {
Ok(stream) => {
match network.mode {
Mode::Blocking => {
stream.set_write_timeout(None).unwrap();
stream.set_read_timeout(None).unwrap();
}
Mode::NonBlocking => panic!("Nonblocking socket mode not supported!"),
Mode::Timeout(t) => {
stream
.set_write_timeout(Some(std::time::Duration::from_millis(t as u64)))
.unwrap();
stream
.set_read_timeout(Some(std::time::Duration::from_millis(t as u64)))
.unwrap();
}
};
TcpSocket {
stream: Some(stream),
mode: network.mode,
}
}
Err(_e) => return Err(()),
})
) -> nb::Result<(), Self::Error> {
TcpStream::connect(format!("{}", remote))
.map(|stream| drop(network.stream.replace(stream)))
.map_err(|_| ().into())
}

fn close(&self, _network: Self::TcpSocket) -> Result<(), Self::Error> {
Expand Down
26 changes: 16 additions & 10 deletions examples/linux.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
mod common;

use embedded_nal::{AddrType, Dns, TcpClient};
use mqttrust::{
MqttEvent, MqttOptions, Notification, PublishRequest, QoS, Request, SubscribeRequest,
SubscribeTopic,
EventLoop, MqttOptions, Notification, PublishRequest, QoS, Request, SubscribeRequest,
SubscribeTopic, TcpSession,
};

use common::network::Network;
Expand All @@ -17,21 +18,26 @@ fn main() {
let (mut p, c) = unsafe { Q.split() };

let network = Network;
// let network = std_embedded_nal::STACK;
let mut socket = network.socket().unwrap();

// Connect to broker.hivemq.com:1883
let mut mqtt_eventloop = MqttEvent::new(
c,
SysTimer::new(),
MqttOptions::new("mqtt_test_client_id", "broker.hivemq.com".into(), 1883),
);
let broker_addr = network
.gethostbyname("broker.hivemq.com", AddrType::Either)
.unwrap();
network
.connect(&mut socket, (broker_addr, 1883).into())
.expect("TCP client cannot connect to the broker");
let mut session = TcpSession::from(socket);
let mut mqtt_eventloop =
EventLoop::new(c, SysTimer::new(), MqttOptions::new("mqtt_test_client_id"));

nb::block!(mqtt_eventloop.connect(&network)).expect("Failed to connect to MQTT");
nb::block!(mqtt_eventloop.connect(&network, &mut session))
.expect("MQTT client's connection request failed");

thread::Builder::new()
.name("eventloop".to_string())
.spawn(move || loop {
match nb::block!(mqtt_eventloop.yield_event(&network)) {
match nb::block!(mqtt_eventloop.yield_event(&network, &mut session)) {
Ok(Notification::Publish(_publish)) => {
// defmt::debug!(
// "[{}, {:?}]: {:?}",
Expand Down
Loading