From 86023d54d927870f73c82532f14598e1e642b0da Mon Sep 17 00:00:00 2001 From: Felix Obenhuber Date: Tue, 26 Mar 2024 16:38:07 +0100 Subject: [PATCH] feat(rumqttc): simplify keepalive interval --- rumqttc/src/v5/eventloop.rs | 35 +++++++++++++++++------------------ rumqttc/src/v5/framed.rs | 2 +- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index e109a6bf4..e96ace350 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -7,7 +7,8 @@ use crate::framed::AsyncReadWrite; use flume::Receiver; use futures_util::{Stream, StreamExt}; use tokio::select; -use tokio::time::{self, error::Elapsed, Instant, Sleep}; +use tokio::time::Interval; +use tokio::time::{self, error::Elapsed}; use std::collections::VecDeque; use std::io; @@ -85,7 +86,7 @@ pub struct EventLoop { /// Network connection to the broker network: Option, /// Keep alive time - keepalive_timeout: Option>>, + keepalive_interval: Interval, } /// Events which can be yielded by the event loop @@ -103,6 +104,8 @@ impl EventLoop { let pending = IntervalQueue::new(options.pending_throttle); let batch_size = options.max_batch_size; let state = MqttState::new(inflight_limit, manual_acks); + assert!(!options.keep_alive.is_zero()); + let keepalive_interval = time::interval(options.keep_alive()); EventLoop { options, @@ -111,7 +114,7 @@ impl EventLoop { requests, pending, network: None, - keepalive_timeout: None, + keepalive_interval, } } @@ -124,7 +127,6 @@ impl EventLoop { /// > For this reason we recommend setting [`AsycClient`](super::AsyncClient)'s channel capacity to `0`. pub fn clean(&mut self) { self.network = None; - self.keepalive_timeout = None; self.pending.extend(self.state.clean()); // drain requests from channel which weren't yet received @@ -144,14 +146,12 @@ impl EventLoop { .await??; self.network = Some(network); - if self.keepalive_timeout.is_none() { - self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive))); - } - // A connack never produces a response packet. Safe to ignore the return value // of `handle_incoming_packet` self.state.handle_incoming_packet(connack)?; - self.pending.reset(); + + self.pending.reset_immediately(); + self.keepalive_interval.reset(); } // Read buffered events from previous polls before calling a new poll @@ -216,8 +216,10 @@ impl EventLoop { network.write(packet).await?; } }, - // Process next packet received from io + // Process next packet from io packet = network.read() => { + // Reset keepalive interval due to packet reception + self.keepalive_interval.reset(); match packet? { Some(packet) => if let Some(packet) = self.state.handle_incoming_packet(packet)? { let flush = matches!(packet, Packet::PingResp(_)); @@ -229,11 +231,8 @@ impl EventLoop { None => return Err(ConnectionError::ConnectionClosed), } }, - // We generate pings irrespective of network activity. This keeps the ping logic - // simple. We can change this behavior in future if necessary (to prevent extra pings) - _ = self.keepalive_timeout.as_mut().unwrap() => { - let timeout = self.keepalive_timeout.as_mut().unwrap(); - timeout.as_mut().reset(Instant::now() + self.options.keep_alive); + // Send a ping request on each interval tick + _ = self.keepalive_interval.tick() => { if let Some(packet) = self.state.handle_outgoing_packet(Request::PingReq)? { network.write(packet).await?; } @@ -291,10 +290,10 @@ impl IntervalQueue { self.queue.extend(requests); } - /// Reset the pending interval tick - pub fn reset(&mut self) { + /// Reset the pending interval tick. Next tick yields immediately + pub fn reset_immediately(&mut self) { if let Some(interval) = self.interval.as_mut() { - interval.reset(); + interval.reset_immediately(); } } } diff --git a/rumqttc/src/v5/framed.rs b/rumqttc/src/v5/framed.rs index 9b02dc98a..207118d65 100644 --- a/rumqttc/src/v5/framed.rs +++ b/rumqttc/src/v5/framed.rs @@ -41,7 +41,7 @@ impl Network { match self.framed.next().await { Some(Ok(packet)) => Ok(Some(packet)), Some(Err(e)) => Err(StateError::Deserialization(e)), - None => Ok(None) + None => Ok(None), } }