From 29f1695159d14c7fab8ea07ead24416b8034a0ab Mon Sep 17 00:00:00 2001 From: r2-93 Date: Tue, 28 Jan 2025 16:27:33 +0100 Subject: [PATCH] Add coalescing option for UART endpoints --- src/endpoint.cpp | 136 ++++++++++++++++++++++++++++++++++++++++++----- src/endpoint.h | 20 ++++++- 2 files changed, 140 insertions(+), 16 deletions(-) diff --git a/src/endpoint.cpp b/src/endpoint.cpp index d945c8c0..b20cd5ea 100644 --- a/src/endpoint.cpp +++ b/src/endpoint.cpp @@ -74,6 +74,9 @@ const ConfFile::OptionsTable UartEndpoint::option_table[] = { {"AllowSrcSysIn", false, ConfFile::parse_uint8_vector, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, allow_src_sys_in)}, {"BlockSrcSysIn", false, ConfFile::parse_uint8_vector, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, block_src_sys_in)}, {"group", false, ConfFile::parse_stdstring, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, group)}, + {"CoalesceBytes", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, coalesce_bytes)}, + {"CoalesceMs", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, coalesce_ms)}, + {"CoalesceNoDelay", false, ConfFile::parse_uint32_vector, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, coalesce_nodelay)}, {} }; @@ -740,7 +743,27 @@ void Endpoint::log_aggregate(unsigned int interval_sec) UartEndpoint::UartEndpoint(std::string name) : Endpoint{ENDPOINT_TYPE_UART, std::move(name)} { - // nothing else to do here + + if (this->_name == "dynamic") { + this->_coalesce_bytes = 5000; + this->_coalesce_ms = 33; + } + + _write_schedule_timer = Mainloop::get_instance().add_timeout( + _coalesce_ms, [this](void*) + { + flush_pending_msgs(); + return true; + }, + this); +} + +UartEndpoint::~UartEndpoint() +{ + if (_write_schedule_timer) { + Mainloop::get_instance().del_timeout(_write_schedule_timer); + _write_schedule_timer = nullptr; + } } bool UartEndpoint::setup(UartEndpointConfig conf) @@ -809,6 +832,13 @@ bool UartEndpoint::setup(UartEndpointConfig conf) this->_group_name = conf.group; + // Coalescing configs + this->_coalesce_bytes = conf.coalesce_bytes; + this->_coalesce_ms = conf.coalesce_ms; + for (auto msg_id : conf.coalesce_nodelay) { + this->add_no_coalesce_msg_id(msg_id); + } + return true; } @@ -1013,32 +1043,110 @@ ssize_t UartEndpoint::_read_msg(uint8_t *buf, size_t len) int UartEndpoint::write_msg(const struct buffer *pbuf) { + + // We cannot add the new message to the coalescence -> send immediately what is scheduled + if (tx_buf.len > 0 && tx_buf.len + pbuf->len > _coalesce_bytes) { + log_trace("New message would overflow the coalescence, sending the pending ones before"); + flush_pending_msgs(); + } + + if (tx_buf.len + pbuf->len > TX_BUF_MAX_SIZE) { + log_trace("Dropping message, tx buffer full"); + return 0; + } + + // Append new data in the tx buffer + memcpy(&tx_buf.data[tx_buf.len], pbuf->data, pbuf->len); + tx_buf.len += pbuf->len; + + int ret = pbuf->len; + + if (_coalesce_bytes == 0 || _coalesce_ms == 0 + || _coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end() + || pbuf->len > _coalesce_bytes) { + log_trace("DEBUG %s %d %d %d %d", + this->get_name().c_str(), + _coalesce_bytes == 0, + _coalesce_ms == 0, + _coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end(), + pbuf->len > _coalesce_bytes); + // Coalescing disabled, or high priority message, or new message larger than the coalescence size + ret = flush_pending_msgs(); + } else { + // Coalescing enabled + // Start coalescing timer (if not already running) + _schedule_write(); + + struct itimerspec ts; + timerfd_gettime(_write_schedule_timer->fd, &ts); + double timer_seconds + = (double)_coalesce_ms / 1e3d - (ts.it_value.tv_sec + ts.it_value.tv_nsec / 1e9d); + + log_trace("Coalescence state: size=%u/%u, timeout=%.2f/%.2f, is high prio message=%d", + tx_buf.len, + _coalesce_bytes, + timer_seconds, + _coalesce_ms / 1e3d, + _coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end()); + } + return ret; + if (fd < 0) { log_error("UART %s: Trying to write invalid fd", _name.c_str()); return -EINVAL; } - /* TODO: send any pending data */ - if (tx_buf.len > 0) { - ; + ssize_t r = ::write(fd, tx_buf.data, tx_buf.len); + if (r == -1 && errno == EAGAIN) { + return -EAGAIN; + } + + _stat.write.total++; + _stat.write.bytes += r; + + tx_buf.len = std::max(ssize_t(0), (ssize_t)tx_buf.len - r); + // memcpy isn't safe for overlapping regions + memmove(tx_buf.data, &tx_buf.data[r], tx_buf.len); + + log_trace("UART [%d]%s: Wrote %zd bytes", fd, _name.c_str(), r); + + return r; +} + +void UartEndpoint::_schedule_write() +{ + if (!_write_scheduled) { + Mainloop::get_instance().mod_timeout(_write_schedule_timer, _coalesce_ms); + _write_scheduled = true; + } +} + +int UartEndpoint::flush_pending_msgs() +{ + Mainloop::get_instance().mod_timeout(_write_schedule_timer, 0); + _write_scheduled = false; + + if (tx_buf.len == 0) { + log_trace("No data in tx buffer, skipping write"); + return 0; + } + + if (fd < 0) { + // skip this endpoint if not connected (e.g. during reconnect) + return 0; } - ssize_t r = ::write(fd, pbuf->data, pbuf->len); + ssize_t r = ::write(fd, tx_buf.data, tx_buf.len); if (r == -1 && errno == EAGAIN) { return -EAGAIN; } _stat.write.total++; - _stat.write.bytes += pbuf->len; + _stat.write.bytes += r; - /* Incomplete packet, we warn and discard the rest */ - if (r != (ssize_t)pbuf->len) { - _incomplete_msgs++; - log_debug("UART %s: Discarding packet, incomplete write %zd but len=%u", - _name.c_str(), - r, - pbuf->len); - } + tx_buf.len = std::max(ssize_t(0), (ssize_t)tx_buf.len - r); + // memcpy isn't safe for overlapping regions + memmove(tx_buf.data, &tx_buf.data[r], tx_buf.len); log_trace("UART [%d]%s: Wrote %zd bytes", fd, _name.c_str(), r); diff --git a/src/endpoint.h b/src/endpoint.h index 837ed6e9..ce73fc87 100644 --- a/src/endpoint.h +++ b/src/endpoint.h @@ -55,6 +55,9 @@ struct UartEndpointConfig { std::vector allow_src_sys_in; std::vector block_src_sys_in; std::string group; + unsigned long coalesce_bytes; + unsigned long coalesce_ms; + std::vector coalesce_nodelay; }; struct UdpEndpointConfig { @@ -303,10 +306,10 @@ class Endpoint : public Pollable { class UartEndpoint : public Endpoint { public: UartEndpoint(std::string name); - ~UartEndpoint() override = default; + ~UartEndpoint() override; int write_msg(const struct buffer *pbuf) override; - int flush_pending_msgs() override { return -ENOSYS; } + int flush_pending_msgs() override; bool setup(UartEndpointConfig config); ///< open UART device and apply config @@ -314,6 +317,11 @@ class UartEndpoint : public Endpoint { static const char *section_pattern; static bool validate_config(const UartEndpointConfig &config); + void add_no_coalesce_msg_id(uint32_t msg_id) + { + _coalesce_nodelay.insert(msg_id); + } + protected: bool open(const char *path); int set_speed(speed_t baudrate); @@ -323,12 +331,20 @@ class UartEndpoint : public Endpoint { int read_msg(struct buffer *pbuf) override; ssize_t _read_msg(uint8_t *buf, size_t len) override; + void _schedule_write(); + bool _write_scheduled = false; + Timeout *_write_schedule_timer = nullptr; + unsigned int _coalesce_bytes = 0UL; // max coalescence size + unsigned long _coalesce_ms = 0UL; // max time to hold data to try to coalesce packets together + + private: size_t _current_baud_idx = 0; Timeout *_change_baud_timeout = nullptr; std::vector _baudrates; bool _change_baud_cb(void *data); + std::set _coalesce_nodelay{}; // immediately send if a mavlink msg_id is in this set }; class UdpEndpoint : public Endpoint {