Skip to content

Commit

Permalink
Add coalescing option for UART endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
r2-93 committed Jan 31, 2025
1 parent 21f7a59 commit 29f1695
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 16 deletions.
136 changes: 122 additions & 14 deletions src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ const ConfFile::OptionsTable UartEndpoint::option_table[] = {
{"AllowSrcSysIn", false, ConfFile::parse_uint8_vector, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, allow_src_sys_in)},
{"BlockSrcSysIn", false, ConfFile::parse_uint8_vector, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, block_src_sys_in)},
{"group", false, ConfFile::parse_stdstring, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, group)},
{"CoalesceBytes", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, coalesce_bytes)},
{"CoalesceMs", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, coalesce_ms)},
{"CoalesceNoDelay", false, ConfFile::parse_uint32_vector, OPTIONS_TABLE_STRUCT_FIELD(UartEndpointConfig, coalesce_nodelay)},
{}
};

Expand Down Expand Up @@ -740,7 +743,27 @@ void Endpoint::log_aggregate(unsigned int interval_sec)
UartEndpoint::UartEndpoint(std::string name)
: Endpoint{ENDPOINT_TYPE_UART, std::move(name)}
{
// nothing else to do here

if (this->_name == "dynamic") {
this->_coalesce_bytes = 5000;
this->_coalesce_ms = 33;
}

_write_schedule_timer = Mainloop::get_instance().add_timeout(
_coalesce_ms, [this](void*)
{
flush_pending_msgs();
return true;
},
this);
}

UartEndpoint::~UartEndpoint()
{
if (_write_schedule_timer) {
Mainloop::get_instance().del_timeout(_write_schedule_timer);
_write_schedule_timer = nullptr;
}
}

bool UartEndpoint::setup(UartEndpointConfig conf)
Expand Down Expand Up @@ -809,6 +832,13 @@ bool UartEndpoint::setup(UartEndpointConfig conf)

this->_group_name = conf.group;

// Coalescing configs
this->_coalesce_bytes = conf.coalesce_bytes;
this->_coalesce_ms = conf.coalesce_ms;
for (auto msg_id : conf.coalesce_nodelay) {
this->add_no_coalesce_msg_id(msg_id);
}

return true;
}

Expand Down Expand Up @@ -1013,32 +1043,110 @@ ssize_t UartEndpoint::_read_msg(uint8_t *buf, size_t len)

int UartEndpoint::write_msg(const struct buffer *pbuf)
{

// We cannot add the new message to the coalescence -> send immediately what is scheduled
if (tx_buf.len > 0 && tx_buf.len + pbuf->len > _coalesce_bytes) {
log_trace("New message would overflow the coalescence, sending the pending ones before");
flush_pending_msgs();
}

if (tx_buf.len + pbuf->len > TX_BUF_MAX_SIZE) {
log_trace("Dropping message, tx buffer full");
return 0;
}

// Append new data in the tx buffer
memcpy(&tx_buf.data[tx_buf.len], pbuf->data, pbuf->len);
tx_buf.len += pbuf->len;

int ret = pbuf->len;

if (_coalesce_bytes == 0 || _coalesce_ms == 0
|| _coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end()
|| pbuf->len > _coalesce_bytes) {
log_trace("DEBUG %s %d %d %d %d",
this->get_name().c_str(),
_coalesce_bytes == 0,
_coalesce_ms == 0,
_coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end(),
pbuf->len > _coalesce_bytes);
// Coalescing disabled, or high priority message, or new message larger than the coalescence size
ret = flush_pending_msgs();
} else {
// Coalescing enabled
// Start coalescing timer (if not already running)
_schedule_write();

struct itimerspec ts;
timerfd_gettime(_write_schedule_timer->fd, &ts);
double timer_seconds
= (double)_coalesce_ms / 1e3d - (ts.it_value.tv_sec + ts.it_value.tv_nsec / 1e9d);

log_trace("Coalescence state: size=%u/%u, timeout=%.2f/%.2f, is high prio message=%d",
tx_buf.len,
_coalesce_bytes,
timer_seconds,
_coalesce_ms / 1e3d,
_coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end());
}
return ret;

if (fd < 0) {
log_error("UART %s: Trying to write invalid fd", _name.c_str());
return -EINVAL;
}

/* TODO: send any pending data */
if (tx_buf.len > 0) {
;
ssize_t r = ::write(fd, tx_buf.data, tx_buf.len);
if (r == -1 && errno == EAGAIN) {
return -EAGAIN;
}

_stat.write.total++;
_stat.write.bytes += r;

tx_buf.len = std::max(ssize_t(0), (ssize_t)tx_buf.len - r);
// memcpy isn't safe for overlapping regions
memmove(tx_buf.data, &tx_buf.data[r], tx_buf.len);

log_trace("UART [%d]%s: Wrote %zd bytes", fd, _name.c_str(), r);

return r;
}

void UartEndpoint::_schedule_write()
{
if (!_write_scheduled) {
Mainloop::get_instance().mod_timeout(_write_schedule_timer, _coalesce_ms);
_write_scheduled = true;
}
}

int UartEndpoint::flush_pending_msgs()
{
Mainloop::get_instance().mod_timeout(_write_schedule_timer, 0);
_write_scheduled = false;

if (tx_buf.len == 0) {
log_trace("No data in tx buffer, skipping write");
return 0;
}

if (fd < 0) {
// skip this endpoint if not connected (e.g. during reconnect)
return 0;
}

ssize_t r = ::write(fd, pbuf->data, pbuf->len);
ssize_t r = ::write(fd, tx_buf.data, tx_buf.len);
if (r == -1 && errno == EAGAIN) {
return -EAGAIN;
}

_stat.write.total++;
_stat.write.bytes += pbuf->len;
_stat.write.bytes += r;

/* Incomplete packet, we warn and discard the rest */
if (r != (ssize_t)pbuf->len) {
_incomplete_msgs++;
log_debug("UART %s: Discarding packet, incomplete write %zd but len=%u",
_name.c_str(),
r,
pbuf->len);
}
tx_buf.len = std::max(ssize_t(0), (ssize_t)tx_buf.len - r);
// memcpy isn't safe for overlapping regions
memmove(tx_buf.data, &tx_buf.data[r], tx_buf.len);

log_trace("UART [%d]%s: Wrote %zd bytes", fd, _name.c_str(), r);

Expand Down
20 changes: 18 additions & 2 deletions src/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ struct UartEndpointConfig {
std::vector<uint8_t> allow_src_sys_in;
std::vector<uint8_t> block_src_sys_in;
std::string group;
unsigned long coalesce_bytes;
unsigned long coalesce_ms;
std::vector<uint32_t> coalesce_nodelay;
};

struct UdpEndpointConfig {
Expand Down Expand Up @@ -303,17 +306,22 @@ class Endpoint : public Pollable {
class UartEndpoint : public Endpoint {
public:
UartEndpoint(std::string name);
~UartEndpoint() override = default;
~UartEndpoint() override;

int write_msg(const struct buffer *pbuf) override;
int flush_pending_msgs() override { return -ENOSYS; }
int flush_pending_msgs() override;

bool setup(UartEndpointConfig config); ///< open UART device and apply config

static const ConfFile::OptionsTable option_table[];
static const char *section_pattern;
static bool validate_config(const UartEndpointConfig &config);

void add_no_coalesce_msg_id(uint32_t msg_id)
{
_coalesce_nodelay.insert(msg_id);
}

protected:
bool open(const char *path);
int set_speed(speed_t baudrate);
Expand All @@ -323,12 +331,20 @@ class UartEndpoint : public Endpoint {
int read_msg(struct buffer *pbuf) override;
ssize_t _read_msg(uint8_t *buf, size_t len) override;

void _schedule_write();
bool _write_scheduled = false;
Timeout *_write_schedule_timer = nullptr;
unsigned int _coalesce_bytes = 0UL; // max coalescence size
unsigned long _coalesce_ms = 0UL; // max time to hold data to try to coalesce packets together


private:
size_t _current_baud_idx = 0;
Timeout *_change_baud_timeout = nullptr;
std::vector<uint32_t> _baudrates;

bool _change_baud_cb(void *data);
std::set<uint32_t> _coalesce_nodelay{}; // immediately send if a mavlink msg_id is in this set
};

class UdpEndpoint : public Endpoint {
Expand Down

0 comments on commit 29f1695

Please sign in to comment.