Skip to content

Commit

Permalink
Packets coalescing on TCP endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanoColli committed Dec 4, 2024
1 parent 7e0923e commit 3777c5d
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 16 deletions.
110 changes: 95 additions & 15 deletions src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ const ConfFile::OptionsTable TcpEndpoint::option_table[] = {
{"AllowSrcSysIn", false, ConfFile::parse_uint8_vector, OPTIONS_TABLE_STRUCT_FIELD(TcpEndpointConfig, allow_src_sys_in)},
{"BlockSrcSysIn", false, ConfFile::parse_uint8_vector, OPTIONS_TABLE_STRUCT_FIELD(TcpEndpointConfig, block_src_sys_in)},
{"group", false, ConfFile::parse_stdstring, OPTIONS_TABLE_STRUCT_FIELD(TcpEndpointConfig, group)},
{"CoalesceBytes", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, coalesce_bytes)},
{"CoalesceMs", false, ConfFile::parse_ul, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, coalesce_ms)},
{"CoalesceNoDelay", false, ConfFile::parse_uint32_vector, OPTIONS_TABLE_STRUCT_FIELD(UdpEndpointConfig, coalesce_nodelay)},
{}
};
// clang-format on
Expand Down Expand Up @@ -1523,10 +1526,27 @@ TcpEndpoint::TcpEndpoint(std::string name)
{
bzero(&sockaddr, sizeof(sockaddr));
bzero(&sockaddr6, sizeof(sockaddr6));

if (this->_name == "dynamic") {
this->_coalesce_bytes = 5000;
this->_coalesce_ms = 33;
}

_write_schedule_timer = Mainloop::get_instance().add_timeout(
_coalesce_ms, [this](void*)
{
flush_pending_msgs();
return true;
},
this);
}

TcpEndpoint::~TcpEndpoint()
{
if (_write_schedule_timer) {
Mainloop::get_instance().del_timeout(_write_schedule_timer);
_write_schedule_timer = nullptr;
}
close();
}

Expand Down Expand Up @@ -1580,6 +1600,13 @@ bool TcpEndpoint::setup(TcpEndpointConfig conf)

this->_group_name = conf.group;

// Coalescing configs
this->_coalesce_bytes = conf.coalesce_bytes;
this->_coalesce_ms = conf.coalesce_ms;
for (auto msg_id : conf.coalesce_nodelay) {
this->add_no_coalesce_msg_id(msg_id);
}

if (!this->open(conf.address, conf.port)) {
log_warning("Could not open %s:%ld, re-trying every %d sec",
conf.address.c_str(),
Expand Down Expand Up @@ -1765,6 +1792,69 @@ ssize_t TcpEndpoint::_read_msg(uint8_t *buf, size_t len)

int TcpEndpoint::write_msg(const struct buffer *pbuf)
{
// We cannot add the new message to the coalescence -> send immediately what is scheduled
if (tx_buf.len > 0 && tx_buf.len + pbuf->len > _coalesce_bytes) {
log_trace("New message would overflow the coalescence, sending the pending ones before");
flush_pending_msgs();
}

if (tx_buf.len + pbuf->len > TX_BUF_MAX_SIZE) {
log_trace("Dropping message, tx buffer full");
return 0;
}

// Append new data in the tx buffer
memcpy(&tx_buf.data[tx_buf.len], pbuf->data, pbuf->len);
tx_buf.len += pbuf->len;

int ret = pbuf->len;

if (_coalesce_bytes == 0 ||
_coalesce_ms == 0 ||
_coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end() ||
pbuf->len > _coalesce_bytes) {
log_trace("DEBUG %s %d %d %d %d", this->get_name().c_str(), _coalesce_bytes == 0, _coalesce_ms == 0,
_coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end(), pbuf->len > _coalesce_bytes);
// Coalescing disabled, or high priority message, or new message larger than the coalescence size
ret = flush_pending_msgs();
}
else {
// Coalescing enabled
// Start coalescing timer (if not already running)
_schedule_write();

struct itimerspec ts;
timerfd_gettime(_write_schedule_timer->fd, &ts);
double timer_seconds = (double) _coalesce_ms/1e3d - (ts.it_value.tv_sec + ts.it_value.tv_nsec/1e9d);

log_trace("Coalescence state: size=%u/%u, timeout=%.2f/%.2f, is high prio message=%d",
tx_buf.len,
_coalesce_bytes,
timer_seconds,
_coalesce_ms/1e3d,
_coalesce_nodelay.find(pbuf->curr.msg_id) != _coalesce_nodelay.end());
}
return ret;
}

void TcpEndpoint::_schedule_write()
{
if (!_write_scheduled) {
Mainloop::get_instance().mod_timeout(_write_schedule_timer, _coalesce_ms);
_write_scheduled = true;
}
}

int TcpEndpoint::flush_pending_msgs()
{
Mainloop::get_instance().mod_timeout(_write_schedule_timer, 0);
_write_scheduled = false;

if (tx_buf.len == 0) {
log_trace("No data in tx buffer, skipping write");
return 0;
}

struct sockaddr *sock;
socklen_t addrlen;

Expand All @@ -1773,11 +1863,6 @@ int TcpEndpoint::write_msg(const struct buffer *pbuf)
return 0;
}

/* TODO: send any pending data */
if (tx_buf.len > 0) {
;
}

if (this->is_ipv6) {
sock = (struct sockaddr *)&sockaddr6;
addrlen = sizeof(sockaddr6);
Expand All @@ -1786,7 +1871,7 @@ int TcpEndpoint::write_msg(const struct buffer *pbuf)
addrlen = sizeof(sockaddr);
}

ssize_t r = ::sendto(fd, pbuf->data, pbuf->len, 0, sock, addrlen);
ssize_t r = ::sendto(fd, tx_buf.data, tx_buf.len, 0, sock, addrlen);
if (r == -1) {
if (errno != EAGAIN && errno != ECONNREFUSED) {
log_error("TCP %s: Error sending tcp packet (%m)", _name.c_str());
Expand All @@ -1803,16 +1888,11 @@ int TcpEndpoint::write_msg(const struct buffer *pbuf)
};

_stat.write.total++;
_stat.write.bytes += pbuf->len;
_stat.write.bytes += r;

/* Incomplete packet, we warn and discard the rest */
if (r != (ssize_t)pbuf->len) {
_incomplete_msgs++;
log_debug("TCP %s: Discarding packet, incomplete write %zd but len=%u",
_name.c_str(),
r,
pbuf->len);
}
tx_buf.len = std::max(ssize_t(0), (ssize_t)tx_buf.len - r);
// memcpy isn't safe for overlapping regions
memmove(tx_buf.data, &tx_buf.data[r], tx_buf.len);

log_trace("TCP [%d]%s: Wrote %zd bytes", fd, _name.c_str(), r);

Expand Down
20 changes: 19 additions & 1 deletion src/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ struct TcpEndpointConfig {
std::vector<uint8_t> allow_src_sys_in;
std::vector<uint8_t> block_src_sys_in;
std::string group;
unsigned long coalesce_bytes;
unsigned long coalesce_ms;
std::vector<uint32_t> coalesce_nodelay;
};

/*
Expand Down Expand Up @@ -386,7 +389,7 @@ class TcpEndpoint : public Endpoint {
~TcpEndpoint() override;

int write_msg(const struct buffer *pbuf) override;
int flush_pending_msgs() override { return -ENOSYS; }
int flush_pending_msgs() override;
bool is_valid() override { return _valid; };
bool is_critical() override { return false; };

Expand All @@ -401,6 +404,11 @@ class TcpEndpoint : public Endpoint {
static const char *section_pattern;
static bool validate_config(const TcpEndpointConfig &config);

void add_no_coalesce_msg_id(uint32_t msg_id)
{
_coalesce_nodelay.insert(msg_id);
}

protected:
bool open(const std::string &ip, unsigned long port);
static int open_ipv4(const char *ip, unsigned long port, sockaddr_in &sockaddr);
Expand All @@ -411,6 +419,14 @@ class TcpEndpoint : public Endpoint {
void _schedule_reconnect();
bool _retry_timeout_cb(void *data);

void _schedule_write();
bool _write_scheduled = false;

Timeout *_write_schedule_timer = nullptr;

unsigned int _coalesce_bytes = 0UL; // max coalescence size
unsigned long _coalesce_ms = 0UL; // max time to hold data to try to coalesce packets together

private:
std::string _ip{};
unsigned long _port = 0;
Expand All @@ -420,4 +436,6 @@ class TcpEndpoint : public Endpoint {
int _retry_timeout = 0; // disable retry by default
struct sockaddr_in sockaddr;
struct sockaddr_in6 sockaddr6;
std::set<uint32_t> _coalesce_nodelay{}; // immediately send if a mavlink msg_id is in this set

};

0 comments on commit 3777c5d

Please sign in to comment.