From c21c6deecf7440ba9b3087afc05f450500b15dd8 Mon Sep 17 00:00:00 2001 From: Dan Nixon Date: Fri, 6 Dec 2024 12:24:38 +0000 Subject: [PATCH 1/2] telemetry-module: improve networking - upgrade embassy-net - add MQTT pings - tidy MQTT pubish code --- telemetry-module/firmware/Cargo.lock | 123 ++------------ telemetry-module/firmware/Cargo.toml | 4 +- telemetry-module/firmware/src/wifi.rs | 235 +++++++++++++------------- 3 files changed, 138 insertions(+), 224 deletions(-) diff --git a/telemetry-module/firmware/Cargo.lock b/telemetry-module/firmware/Cargo.lock index aab92aff..96536b93 100644 --- a/telemetry-module/firmware/Cargo.lock +++ b/telemetry-module/firmware/Cargo.lock @@ -17,27 +17,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" -[[package]] -name = "as-slice" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45403b49e3954a4b8428a0ac21a4b7afadccf92bfd96273f1a58cd4812496ae0" -dependencies = [ - "generic-array 0.12.4", - "generic-array 0.13.3", - "generic-array 0.14.7", - "stable_deref_trait", -] - -[[package]] -name = "as-slice" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "516b6b4f0e40d50dcda9365d53964ec74560ad4284da2e7fc97122cd83174516" -dependencies = [ - "stable_deref_trait", -] - [[package]] name = "ascii-canvas" version = "3.0.0" @@ -62,18 +41,6 @@ dependencies = [ "critical-section", ] -[[package]] -name = "atomic-pool" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58c5fc22e05ec2884db458bf307dc7b278c9428888d2b6e6fad9c0ae7804f5f6" -dependencies = [ - "as-slice 0.1.5", - "as-slice 0.2.1", - "atomic-polyfill", - "stable_deref_trait", -] - [[package]] name = "autocfg" version = "1.4.0" @@ -232,7 +199,7 @@ dependencies = [ "defmt", "embassy-futures", "embassy-net-driver-channel", - "embassy-sync 0.6.0", + "embassy-sync", "embassy-time", "embedded-hal 1.0.0", "futures", @@ -405,7 +372,7 @@ checksum = "5794414bc20e0d750f145bc0e82366b19dd078e9e075e8331fb8dd069a1cb6a2" dependencies = [ "defmt", "embassy-futures", - "embassy-sync 0.6.0", + "embassy-sync", "embassy-time", "embedded-hal 0.2.7", "embedded-hal 1.0.0", @@ -462,25 +429,20 @@ dependencies = [ [[package]] name = "embassy-net" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55cf91dd36dfd623de32242af711fd294d41159f02130052fc93c5c5ba93febe" +checksum = "49f9f2979069031c153e41075a43074c36a64492e598780b27944a605f829d23" dependencies = [ - "as-slice 0.2.1", - "atomic-pool", "defmt", "document-features", "embassy-net-driver", - "embassy-sync 0.5.0", + "embassy-sync", "embassy-time", "embedded-io-async", "embedded-nal-async", - "futures", - "generic-array 0.14.7", "heapless 0.8.0", "managed", "smoltcp", - "stable_deref_trait", ] [[package]] @@ -500,7 +462,7 @@ checksum = "4818c32afec43e3cae234f324bad9a976c9aa7501022d26ff60a4017a1a006b7" dependencies = [ "embassy-futures", "embassy-net-driver", - "embassy-sync 0.6.0", + "embassy-sync", ] [[package]] @@ -519,7 +481,7 @@ dependencies = [ "embassy-embedded-hal", "embassy-futures", "embassy-hal-internal", - "embassy-sync 0.6.0", + "embassy-sync", "embassy-time", "embassy-time-driver", "embassy-usb-driver", @@ -542,26 +504,14 @@ dependencies = [ [[package]] name = "embassy-sync" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd938f25c0798db4280fcd8026bf4c2f48789aebf8f77b6e5cf8a7693ba114ec" -dependencies = [ - "cfg-if", - "critical-section", - "embedded-io-async", - "futures-util", - "heapless 0.8.0", -] - -[[package]] -name = "embassy-sync" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3e0c49ff02ebe324faf3a8653ba91582e2d0a7fdef5bc88f449d5aa1bfcc05c" +checksum = "3899a6e39fa3f54bf8aaf00979f9f9c0145a522f7244810533abbb748be6ce82" dependencies = [ "cfg-if", "critical-section", "embedded-io-async", + "futures-sink", "futures-util", "heapless 0.8.0", ] @@ -684,23 +634,21 @@ dependencies = [ [[package]] name = "embedded-nal" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8a943fad5ed3d3f8a00f1e80f6bba371f1e7f0df28ec38477535eb318dc19cc" +checksum = "c56a28be191a992f28f178ec338a0bf02f63d7803244add736d026a471e6ed77" dependencies = [ "nb 1.1.0", - "no-std-net", ] [[package]] name = "embedded-nal-async" -version = "0.7.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72229137a4fc12d239b0b7f50f04b30790678da6d782a0f3f1909bf57ec4b759" +checksum = "76959917cd2b86f40a98c28dd5624eddd1fa69d746241c8257eac428d83cb211" dependencies = [ "embedded-io-async", "embedded-nal", - "no-std-net", ] [[package]] @@ -861,34 +809,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "generic-array" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" -dependencies = [ - "typenum", -] - -[[package]] -name = "generic-array" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f797e67af32588215eaaab8327027ee8e71b9dd0b2b26996aedf20c030fce309" -dependencies = [ - "typenum", -] - -[[package]] -name = "generic-array" -version = "0.14.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" -dependencies = [ - "typenum", - "version_check", -] - [[package]] name = "getrandom" version = "0.2.15" @@ -1003,7 +923,7 @@ dependencies = [ "embassy-futures", "embassy-net", "embassy-rp", - "embassy-sync 0.6.0", + "embassy-sync", "embassy-time", "embassy-time-queue-driver", "embedded-graphics", @@ -1187,12 +1107,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" -[[package]] -name = "no-std-net" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65" - [[package]] name = "num-traits" version = "0.2.19" @@ -1498,8 +1412,7 @@ dependencies = [ [[package]] name = "rust-mqtt" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f71160765f368fd9a84e0955e2ddb6d64ac9018fee1c5323354d6d08c816b40" +source = "git+https://github.com/DanNixon/rust-mqtt#9e6d7f30053776c3ebe6c0d7861376e2ab1641f3" dependencies = [ "defmt", "embedded-io", @@ -1611,9 +1524,9 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smoltcp" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a1a996951e50b5971a2c8c0fa05a381480d70a933064245c4a223ddc87ccc97" +checksum = "dad095989c1533c1c266d9b1e8d70a1329dd3723c3edac6d03bbd67e7bf6f4bb" dependencies = [ "bitflags 1.3.2", "byteorder", diff --git a/telemetry-module/firmware/Cargo.toml b/telemetry-module/firmware/Cargo.toml index 3dadfc3d..a5f30e88 100644 --- a/telemetry-module/firmware/Cargo.toml +++ b/telemetry-module/firmware/Cargo.toml @@ -14,7 +14,7 @@ assign-resources = "0.4.1" # Wifi cyw43 = { version = "0.2.0", features = ["defmt", "firmware-logs"] } cyw43-pio = { version = "0.2.0", features = ["defmt"] } -embassy-net = { version = "0.4.0", features = ["defmt", "dhcpv4", "dns", "proto-ipv4", "tcp", "udp"] } +embassy-net = { version = "0.5.0", features = ["defmt", "dhcpv4", "dns", "proto-ipv4", "tcp", "udp"] } rand = { version = "0.8.5", default-features = false } embassy-executor = { version = "0.6.1", features = ["task-arena-size-98304", "arch-cortex-m", "executor-thread", "executor-interrupt", "defmt", "integrated-timers"] } @@ -46,7 +46,7 @@ mipidsi = "0.8.0" display-interface-spi = "0.5.0" # MQTT telemetry TX -rust-mqtt = { version = "0.3.0", default-features = false, features = ["defmt"] } +rust-mqtt = { git = "https://github.com/DanNixon/rust-mqtt", default-features = false, features = ["defmt"] } serde-json-core = { version = "0.6.0", features = ["defmt"] } [profile.release] diff --git a/telemetry-module/firmware/src/wifi.rs b/telemetry-module/firmware/src/wifi.rs index 768433cd..31039f02 100644 --- a/telemetry-module/firmware/src/wifi.rs +++ b/telemetry-module/firmware/src/wifi.rs @@ -1,8 +1,9 @@ use crate::telemetry::TELEMETRY_MESSAGES; use cyw43::{PowerManagementMode, State}; use cyw43_pio::PioSpi; -use defmt::{info, unwrap, warn}; +use defmt::{debug, info, unwrap, warn}; use embassy_executor::Spawner; +use embassy_futures::select::{select, Either}; use embassy_net::{ tcp::TcpSocket, Config, IpAddress, Ipv4Address, Stack, StackResources, StaticConfigV4, }; @@ -16,7 +17,7 @@ use embassy_rp::{ use embassy_sync::{ blocking_mutex::raw::CriticalSectionRawMutex, channel::Channel, pubsub::WaitResult, }; -use embassy_time::{Duration, Timer}; +use embassy_time::{Duration, Ticker, Timer}; use rand::RngCore; use rust_mqtt::{ client::{ @@ -40,6 +41,8 @@ const ONLINE_MQTT_TOPIC: &str = "hoshiguma/telemetry-module/online"; const VERSION_MQTT_TOPIC: &str = "hoshiguma/telemetry-module/version"; const TELEMETRY_MQTT_TOPIC: &str = "hoshiguma/events"; +const MQTT_BUFFER_SIZE: usize = 512; + #[derive(Clone)] pub(crate) enum NetworkEvent { NetworkConnected(StaticConfigV4), @@ -55,6 +58,18 @@ bind_interrupts!(struct Irqs { PIO0_IRQ_0 => InterruptHandler; }); +#[embassy_executor::task] +async fn cyw43_task( + runner: cyw43::Runner<'static, Output<'static>, PioSpi<'static, PIO0, 0, DMA_CH0>>, +) -> ! { + runner.run().await +} + +#[embassy_executor::task] +async fn net_task(mut runner: embassy_net::Runner<'static, cyw43::NetDriver<'static>>) -> ! { + runner.run().await +} + #[embassy_executor::task] pub(super) async fn task(r: crate::WifiResources, spawner: Spawner) { let pwr = Output::new(r.pwr, Level::Low); @@ -89,25 +104,14 @@ pub(super) async fn task(r: crate::WifiResources, spawner: Spawner) { let mut rng = RoscRng; let seed = rng.next_u64(); - static STACK: StaticCell>> = StaticCell::new(); static RESOURCES: StaticCell> = StaticCell::new(); - let stack = &*STACK.init(Stack::new( + let (stack, runner) = embassy_net::new( net_device, Config::dhcpv4(Default::default()), RESOURCES.init(StackResources::<4>::new()), seed, - )); - - unwrap!(spawner.spawn(net_task(stack))); - - let mut rx_buffer = [0; 4096]; - let mut tx_buffer = [0; 4096]; - - const MQTT_BUFFER_SIZE: usize = 512; - let mut mqtt_rx_buffer = [0; MQTT_BUFFER_SIZE]; - let mut mqtt_tx_buffer = [0; MQTT_BUFFER_SIZE]; - - let mut telem_rx = TELEMETRY_MESSAGES.subscriber().unwrap(); + ); + unwrap!(spawner.spawn(net_task(runner))); info!("Joining WiFi network {}", WIFI_SSID); loop { @@ -134,86 +138,103 @@ pub(super) async fn task(r: crate::WifiResources, spawner: Spawner) { } loop { - let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); - socket.set_timeout(Some(Duration::from_secs(10))); - - info!( - "Connecting to MQTT broker {}:{}", - MQTT_BROKER_IP, MQTT_BROKER_PORT - ); - let connection = socket.connect((MQTT_BROKER_IP, MQTT_BROKER_PORT)).await; - if let Err(e) = connection { - warn!("Broker socket connection error: {:?}", e); - continue; + // Start the MQTT client + if run_mqtt_client(stack).await.is_err() { + // Notify of MQTT broker connection loss + NETWORK_EVENTS + .send(NetworkEvent::MqttBrokerDisconnected) + .await; } - let mut client = { - let mut config = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(20000)); - config.add_client_id(MQTT_CLIENT_ID); - config.add_username(MQTT_USERNAME); - config.add_password(env!("MQTT_PASSWORD")); - config.max_packet_size = MQTT_BUFFER_SIZE as u32; - config.add_will(ONLINE_MQTT_TOPIC, b"false", true); - - MqttClient::<_, 5, _>::new( - socket, - &mut mqtt_tx_buffer, - MQTT_BUFFER_SIZE, - &mut mqtt_rx_buffer, - MQTT_BUFFER_SIZE, - config, - ) - }; - - match client.connect_to_broker().await { - Ok(()) => { - info!("Connected to MQTT broker"); - NETWORK_EVENTS.send(NetworkEvent::MqttBrokerConnected).await; - } - Err(e) => { - warn!("MQTT error: {:?}", e); - NETWORK_EVENTS - .send(NetworkEvent::MqttBrokerDisconnected) - .await; - continue; - } - } + // Wait a little bit of time before connecting again + Timer::after_millis(500).await; + } +} - match client - .send_message(ONLINE_MQTT_TOPIC, b"true", QualityOfService::QoS1, true) - .await - { - Ok(()) => {} - Err(e) => { - warn!("MQTT error: {:?}", e); - NETWORK_EVENTS - .send(NetworkEvent::MqttBrokerDisconnected) - .await; - continue; - } - } +async fn run_mqtt_client(stack: Stack<'_>) -> Result<(), ()> { + let mut rx_buffer = [0; 4096]; + let mut tx_buffer = [0; 4096]; - match client - .send_message( - VERSION_MQTT_TOPIC, - git_version::git_version!().as_bytes(), - QualityOfService::QoS1, - true, - ) - .await - { - Ok(()) => {} - Err(e) => { - warn!("MQTT error: {:?}", e); - NETWORK_EVENTS - .send(NetworkEvent::MqttBrokerDisconnected) - .await; - continue; - } + let mut mqtt_rx_buffer = [0; MQTT_BUFFER_SIZE]; + let mut mqtt_tx_buffer = [0; MQTT_BUFFER_SIZE]; + + let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); + socket.set_timeout(Some(Duration::from_secs(10))); + + info!( + "Connecting to MQTT broker {}:{}", + MQTT_BROKER_IP, MQTT_BROKER_PORT + ); + socket + .connect((MQTT_BROKER_IP, MQTT_BROKER_PORT)) + .await + .map_err(|e| { + warn!("Broker socket connection error: {:?}", e); + })?; + + let mut client = { + let mut config = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(20000)); + config.add_client_id(MQTT_CLIENT_ID); + config.add_username(MQTT_USERNAME); + config.add_password(env!("MQTT_PASSWORD")); + config.max_packet_size = MQTT_BUFFER_SIZE as u32; + config.add_will(ONLINE_MQTT_TOPIC, b"false", true); + + MqttClient::<_, 5, _>::new( + socket, + &mut mqtt_tx_buffer, + MQTT_BUFFER_SIZE, + &mut mqtt_rx_buffer, + MQTT_BUFFER_SIZE, + config, + ) + }; + + match client.connect_to_broker().await { + Ok(()) => { + info!("Connected to MQTT broker"); + NETWORK_EVENTS.send(NetworkEvent::MqttBrokerConnected).await; + } + Err(e) => { + warn!("MQTT error: {:?}", e); + return Err(()); } + } + + client + .send_message(ONLINE_MQTT_TOPIC, b"true", QualityOfService::QoS1, true) + .await + .map_err(|e| { + warn!("MQTT publish error: {:?}", e); + })?; + + client + .send_message( + VERSION_MQTT_TOPIC, + git_version::git_version!().as_bytes(), + QualityOfService::QoS1, + true, + ) + .await + .map_err(|e| { + warn!("MQTT publish error: {:?}", e); + })?; + + let mut ping_tick = Ticker::every(Duration::from_secs(5)); + let mut telem_rx = TELEMETRY_MESSAGES.subscriber().unwrap(); - loop { - match telem_rx.next_message().await { + loop { + match select(ping_tick.next(), telem_rx.next_message()).await { + Either::First(_) => match client.send_ping().await { + Ok(_) => { + debug!("MQTT ping OK"); + } + Err(e) => { + warn!("MQTT ping error: {:?}", e); + return Err(()); + } + }, + Either::Second(msg) => match msg { WaitResult::Lagged(msg_count) => { warn!( "Telemetry message receiver lagged, missed {} messages", @@ -221,44 +242,24 @@ pub(super) async fn task(r: crate::WifiResources, spawner: Spawner) { ); } WaitResult::Message(msg) => { - let veccy = serde_json_core::to_vec::<_, MQTT_BUFFER_SIZE>(&msg); - match veccy { + match serde_json_core::to_vec::<_, MQTT_BUFFER_SIZE>(&msg) { Ok(data) => { - match client + client .send_message( TELEMETRY_MQTT_TOPIC, &data, QualityOfService::QoS1, - true, + false, ) .await - { - Ok(()) => {} - Err(e) => { - warn!("MQTT error: {:?}", e); - NETWORK_EVENTS - .send(NetworkEvent::MqttBrokerDisconnected) - .await; - continue; - } - } + .map_err(|e| { + warn!("MQTT publish error: {:?}", e); + })?; } Err(e) => warn!("Cannot JSON serialise message: {}", e), } } - } + }, } } } - -#[embassy_executor::task] -async fn cyw43_task( - runner: cyw43::Runner<'static, Output<'static>, PioSpi<'static, PIO0, 0, DMA_CH0>>, -) -> ! { - runner.run().await -} - -#[embassy_executor::task] -async fn net_task(stack: &'static Stack>) -> ! { - stack.run().await -} From 8390258386dc38c8e2972919510455c00ec264d8 Mon Sep 17 00:00:00 2001 From: Dan Nixon Date: Sat, 7 Dec 2024 13:53:54 +0000 Subject: [PATCH 2/2] telemetry-module: change display log level --- telemetry-module/firmware/src/display/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telemetry-module/firmware/src/display/mod.rs b/telemetry-module/firmware/src/display/mod.rs index ef7b7995..a078db37 100644 --- a/telemetry-module/firmware/src/display/mod.rs +++ b/telemetry-module/firmware/src/display/mod.rs @@ -4,7 +4,7 @@ pub(super) mod state; use crate::ui_button::{UiEvent, UI_INPUTS}; use core::cell::RefCell; -use defmt::{info, Format}; +use defmt::{debug, Format}; use display_interface_spi::SPIInterface; use embassy_embedded_hal::shared_bus::blocking::spi::SpiDeviceWithConfig; use embassy_futures::select::{select3, Either3}; @@ -103,7 +103,7 @@ pub(super) async fn task(r: crate::DisplayResources) { }; if let Some(draw_type) = draw_type { - info!("Display draw ({})", draw_type); + debug!("Display draw ({})", draw_type); if draw_type == DrawType::Full { draw_drawable!( &mut display,