Skip to content

Commit

Permalink
Replace uORB poll with HRT-based polling
Browse files Browse the repository at this point in the history
Signed-off-by: Jukka Laitinen <[email protected]>
  • Loading branch information
jlaitine committed Jun 6, 2024
1 parent 01cff3a commit 72e6980
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 262 deletions.
16 changes: 5 additions & 11 deletions src/modules/mavlink/mavlink_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2356,7 +2356,6 @@ Mavlink::task_main(int argc, char *argv[])
uorb_poll_error_counter++;
}

_stream_poller->ack_all();
#else
px4_usleep(_main_loop_delay);
#endif
Expand Down Expand Up @@ -2883,27 +2882,22 @@ void Mavlink::configure_sik_radio()


int
Mavlink::register_orb_poll(uint16_t stream_id, ORB_ID *orbs, int count)
Mavlink::register_stream_poll(uint16_t stream_id, uint32_t interval_ms)
{
#if defined(CONFIG_MAVLINK_UORB_POLL)
return _stream_poller->register_orbs(stream_id, orbs, count);
return _stream_poller->register_poll(stream_id, interval_ms);
#else
(void)stream_id;
(void)orbs;
(void)count;
return PX4_OK;
#endif
}

int
Mavlink::unregister_orb_poll(uint16_t stream_id)
Mavlink::unregister_stream_poll(uint16_t stream_id)
{
#if defined(CONFIG_MAVLINK_UORB_POLL)
return _stream_poller->unregister_orbs(stream_id);
#else
(void)stream_id;
return PX4_OK;
_stream_poller->unregister_poll(stream_id);
#endif
return PX4_OK;
}

int
Expand Down
4 changes: 2 additions & 2 deletions src/modules/mavlink/mavlink_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ class Mavlink final : public ModuleParams
/**
* Register/Unregister a stream orbs for polling
*/
int register_orb_poll(uint16_t stream_id, ORB_ID *orbs, int count);
int unregister_orb_poll(uint16_t stream_id);
int register_stream_poll(uint16_t stream_id, uint32_t interval_ms);
int unregister_stream_poll(uint16_t stream_id);

int set_stream_interval(MavlinkStream *stream, int interval);

Expand Down
243 changes: 42 additions & 201 deletions src/modules/mavlink/mavlink_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,225 +130,66 @@ MavlinkStream::update(const hrt_abstime &t)
}

#if defined(CONFIG_MAVLINK_UORB_POLL)
// 20ms default update rate for polled topics, if not set otherwise
const unsigned int MavlinkStreamDefaultTopicInterval = 20;

MavlinkStreamPoll::MavlinkStreamPoll() :
_fds{nullptr},
_orbs{nullptr},
_reqs{nullptr},
_reqs_capacity{16},
_reqs_count{0},
_capacity{16},
_count{0}

MavlinkStreamPoll::MavlinkStreamPoll()
{
_orbs = (MavStreamPollItem *)malloc(16 * sizeof(MavStreamPollItem));
_fds = (orb_poll_struct_t *)malloc(16 * sizeof(orb_poll_struct_t));
_reqs = (MavStreamOrbPollReq *)malloc(16 * sizeof(MavStreamOrbPollReq));
px4_sem_init(&_poll_sem, 1, 0);
#if defined(__PX4_NUTTX)
sem_setprotocol(&_poll_sem, SEM_PRIO_NONE);
#endif

pthread_mutex_init(&_mtx, nullptr);
}

MavlinkStreamPoll::~MavlinkStreamPoll()
{
if (_fds) {
free(_fds);
}

if (_orbs) {
free(_orbs);
}

if (_reqs) {
free(_reqs);
for (auto req : _reqs) {
_reqs.remove(req);
hrt_cancel(&req->_hrt_req);
delete (req);
}

pthread_mutex_destroy(&_mtx);
}

int
MavlinkStreamPoll::_add_orb(ORB_ID orb_id, int interval_ms)
{
// Check if the orb is already in the list
bool exists = false;

for (int i = 0; i < _count; i++) {
if (_orbs[i].orb_id == orb_id && _orbs[i].interval == interval_ms) {
_orbs[i].usage_count++;
exists = true;
break;
}
}

// Did not exist, add new one to the list
if (!exists) {
while (_count >= _capacity) {
_capacity = _capacity + 16;
_orbs = (MavStreamPollItem *)realloc(_orbs, _capacity * sizeof(MavStreamPollItem));
_fds = (orb_poll_struct_t *)realloc(_fds, _capacity * sizeof(orb_poll_struct_t));

if (!_fds || !_orbs) {
PX4_ERR("failed to allocate memory for orb poll items");
return PX4_ERROR;
}
}

_orbs[_count].orb_id = orb_id;
_orbs[_count].interval = interval_ms;
_orbs[_count].usage_count = 1;
_orbs[_count].fd = orb_subscribe(get_orb_meta(orb_id));
orb_set_interval(_orbs[_count].fd, interval_ms);
_count++;
}

return PX4_OK;
}

int
MavlinkStreamPoll::_remove_orb(ORB_ID orb_id, int interval_ms)
MavlinkStreamPoll::register_poll(uint16_t stream_id, uint32_t interval_ms)
{
// Decrement usage/remove the item from the orbs list
for (int i = 0; i < _count; i++) {
if (_orbs[i].orb_id == orb_id &&
_orbs[i].interval == interval_ms) {
_orbs[i].usage_count--;

// If this was the last request for the orb,
// unsubscribe and remove from orbs list
if (_orbs[i].usage_count == 0) {
orb_unsubscribe(_orbs[i].fd);
// Replace the current item with
// the last item in the _orbs list.
// Loop counting is not disturbed because
// we break out from loop after this.
_orbs[i] = _orbs[--_count];
}

break;
}
}

return PX4_OK;
}

/**
* Register stream object to poll list of orbs
*/
int
MavlinkStreamPoll::register_orbs(uint16_t stream_id, ORB_ID *orbs, int cnt)
{
if (!orbs || cnt <= 0) {
return PX4_OK;
}
MavStreamPollReq *req = new MavStreamPollReq(stream_id);

pthread_mutex_lock(&_mtx);

for (int i = 0; i < cnt; i++) {

// Increase reqs capacity if necessary
while (_reqs_count >= _reqs_capacity) {
_reqs_capacity = _reqs_capacity + 16;
_reqs = (MavStreamOrbPollReq *)realloc(_reqs, _reqs_capacity * sizeof(MavStreamOrbPollReq));

if (!_reqs) {
PX4_ERR("failed to allocate memory for orb poll reqs");
pthread_mutex_unlock(&_mtx);
return PX4_ERROR;
}
}

MavStreamOrbPollReq *req = &_reqs[_reqs_count];
req->stream_id = stream_id;
req->orb_id = orbs[i];
req->interval = MavlinkStreamDefaultTopicInterval;
_reqs_count++;

// Update orbs list
_add_orb(orbs[i], req->interval);
}

// Update fds
for (int i = 0; i < _count; i++) {
_fds[i].fd = _orbs[i].fd;
_fds[i].events = POLLIN;
_fds[i].revents = 0;
}

_reqs.add(req);
pthread_mutex_unlock(&_mtx);
return PX4_OK;

hrt_call_every(&req->_hrt_req, (hrt_abstime)interval_ms * 1000,
(hrt_abstime)interval_ms * 1000, hrt_callback, &_poll_sem);
return OK;
}

/**
* Unregister stream object from orbs polling
*/
int
MavlinkStreamPoll::unregister_orbs(uint16_t stream_id)
void
MavlinkStreamPoll::unregister_poll(uint16_t stream_id)
{
int i = 0;

pthread_mutex_lock(&_mtx);

while (i < _reqs_count) {
if (_reqs[i].stream_id == stream_id) {

// Remove orb from the orbs list
_remove_orb(_reqs[i].orb_id, _reqs[i].interval);

// Finally, replace the current item with
// the last item in the _reqs list
_reqs[i] = _reqs[--_reqs_count];

} else {
// Only increment if current _reqs item is not removed.
// Otherwise we have moved the last item to the current
// position, so we need to check the same index again.
i++;
for (auto req : _reqs) {
if (req->_stream_id == stream_id) {
_reqs.remove(req);
hrt_cancel(&req->_hrt_req);
delete (req);
break;
}
}

// Update fds
for (int j = 0; j < _count; j++) {
_fds[j].fd = _orbs[j].fd;
_fds[j].events = POLLIN;
_fds[j].revents = 0;
}

pthread_mutex_unlock(&_mtx);
return PX4_OK;
}

/**
* Set stream update interval to adjust orb polling accordingly
*/
int
MavlinkStreamPoll::set_interval(uint16_t stream_id, int interval_ms)
MavlinkStreamPoll::set_interval(uint16_t stream_id, uint32_t interval_ms)
{
pthread_mutex_lock(&_mtx);

// Renew all uorb subscriptions of given stream with new interval value
for (int i = 0; i < _count; i++) {
if (_reqs[i].stream_id == stream_id) {

// Remove orb_id subscription with current old interval
_remove_orb(_reqs[i].orb_id, _reqs[i].interval);

// Update interval
_reqs[i].interval = interval_ms;

// Add orb_id subscription with new interval
_add_orb(_reqs[i].orb_id, _reqs[i].interval);

}
}

// Update fds
for (int j = 0; j < _count; j++) {
_fds[j].fd = _orbs[j].fd;
_fds[j].events = POLLIN;
_fds[j].revents = 0;
}

pthread_mutex_unlock(&_mtx);
return PX4_OK;
unregister_poll(stream_id);
return register_poll(stream_id, interval_ms);
}

/**
Expand All @@ -364,27 +205,27 @@ MavlinkStreamPoll::poll(const hrt_abstime timeout)
timeout_ms = 1;
}

pthread_mutex_lock(&_mtx);
// Wait event for a maximum timeout time

ret = px4_poll(_fds, _count, timeout_ms);
struct timespec to;
#if defined(CONFIG_ARCH_BOARD_PX4_SITL)
px4_clock_gettime(CLOCK_MONOTONIC, &to);
#else
px4_clock_gettime(CLOCK_REALTIME, &to);
#endif
hrt_abstime now = ts_to_abstime(&to);
abstime_to_ts(&to, now + (hrt_abstime)timeout * 1000);

pthread_mutex_unlock(&_mtx);
ret = px4_sem_timedwait(&_poll_sem, &to);

return ret;
}

/**
* Acknowledge all orb data for next poll
*/
void
MavlinkStreamPoll::ack_all()
MavlinkStreamPoll::hrt_callback(void *arg)
{
pthread_mutex_lock(&_mtx);
px4_sem_post((px4_sem_t *)arg);
}

for (int i = 0; i < _count; i++) {
orb_ack(_orbs[i].fd);
}

pthread_mutex_unlock(&_mtx);
}
#endif /* CONFIG_MAVLINK_UORB_POLL */
Loading

0 comments on commit 72e6980

Please sign in to comment.