diff --git a/src/modules/mavlink/mavlink_main.cpp b/src/modules/mavlink/mavlink_main.cpp index c88a154115ca..2f5b9bc7d8d8 100644 --- a/src/modules/mavlink/mavlink_main.cpp +++ b/src/modules/mavlink/mavlink_main.cpp @@ -2356,7 +2356,6 @@ Mavlink::task_main(int argc, char *argv[]) uorb_poll_error_counter++; } - _stream_poller->ack_all(); #else px4_usleep(_main_loop_delay); #endif @@ -2883,27 +2882,22 @@ void Mavlink::configure_sik_radio() int -Mavlink::register_orb_poll(uint16_t stream_id, ORB_ID *orbs, int count) +Mavlink::register_stream_poll(uint16_t stream_id, uint32_t interval_ms) { #if defined(CONFIG_MAVLINK_UORB_POLL) - return _stream_poller->register_orbs(stream_id, orbs, count); + return _stream_poller->register_poll(stream_id, interval_ms); #else - (void)stream_id; - (void)orbs; - (void)count; return PX4_OK; #endif } int -Mavlink::unregister_orb_poll(uint16_t stream_id) +Mavlink::unregister_stream_poll(uint16_t stream_id) { #if defined(CONFIG_MAVLINK_UORB_POLL) - return _stream_poller->unregister_orbs(stream_id); -#else - (void)stream_id; - return PX4_OK; + _stream_poller->unregister_poll(stream_id); #endif + return PX4_OK; } int diff --git a/src/modules/mavlink/mavlink_main.h b/src/modules/mavlink/mavlink_main.h index cfbfbb129aa0..d874382c67bd 100644 --- a/src/modules/mavlink/mavlink_main.h +++ b/src/modules/mavlink/mavlink_main.h @@ -527,8 +527,8 @@ class Mavlink final : public ModuleParams /** * Register/Unregister a stream orbs for polling */ - int register_orb_poll(uint16_t stream_id, ORB_ID *orbs, int count); - int unregister_orb_poll(uint16_t stream_id); + int register_stream_poll(uint16_t stream_id, uint32_t interval_ms); + int unregister_stream_poll(uint16_t stream_id); int set_stream_interval(MavlinkStream *stream, int interval); diff --git a/src/modules/mavlink/mavlink_stream.cpp b/src/modules/mavlink/mavlink_stream.cpp index b5e3e53d9da2..a4566357bad1 100644 --- a/src/modules/mavlink/mavlink_stream.cpp +++ b/src/modules/mavlink/mavlink_stream.cpp @@ -130,225 +130,66 @@ MavlinkStream::update(const hrt_abstime &t) } #if defined(CONFIG_MAVLINK_UORB_POLL) -// 20ms default update rate for polled topics, if not set otherwise -const unsigned int MavlinkStreamDefaultTopicInterval = 20; - -MavlinkStreamPoll::MavlinkStreamPoll() : - _fds{nullptr}, - _orbs{nullptr}, - _reqs{nullptr}, - _reqs_capacity{16}, - _reqs_count{0}, - _capacity{16}, - _count{0} + +MavlinkStreamPoll::MavlinkStreamPoll() { - _orbs = (MavStreamPollItem *)malloc(16 * sizeof(MavStreamPollItem)); - _fds = (orb_poll_struct_t *)malloc(16 * sizeof(orb_poll_struct_t)); - _reqs = (MavStreamOrbPollReq *)malloc(16 * sizeof(MavStreamOrbPollReq)); + px4_sem_init(&_poll_sem, 1, 0); +#if defined(__PX4_NUTTX) + sem_setprotocol(&_poll_sem, SEM_PRIO_NONE); +#endif + pthread_mutex_init(&_mtx, nullptr); } MavlinkStreamPoll::~MavlinkStreamPoll() { - if (_fds) { - free(_fds); - } - - if (_orbs) { - free(_orbs); - } - - if (_reqs) { - free(_reqs); + for (auto req : _reqs) { + _reqs.remove(req); + hrt_cancel(&req->_hrt_req); + delete (req); } pthread_mutex_destroy(&_mtx); } int -MavlinkStreamPoll::_add_orb(ORB_ID orb_id, int interval_ms) -{ - // Check if the orb is already in the list - bool exists = false; - - for (int i = 0; i < _count; i++) { - if (_orbs[i].orb_id == orb_id && _orbs[i].interval == interval_ms) { - _orbs[i].usage_count++; - exists = true; - break; - } - } - - // Did not exist, add new one to the list - if (!exists) { - while (_count >= _capacity) { - _capacity = _capacity + 16; - _orbs = (MavStreamPollItem *)realloc(_orbs, _capacity * sizeof(MavStreamPollItem)); - _fds = (orb_poll_struct_t *)realloc(_fds, _capacity * sizeof(orb_poll_struct_t)); - - if (!_fds || !_orbs) { - PX4_ERR("failed to allocate memory for orb poll items"); - return PX4_ERROR; - } - } - - _orbs[_count].orb_id = orb_id; - _orbs[_count].interval = interval_ms; - _orbs[_count].usage_count = 1; - _orbs[_count].fd = orb_subscribe(get_orb_meta(orb_id)); - orb_set_interval(_orbs[_count].fd, interval_ms); - _count++; - } - - return PX4_OK; -} - -int -MavlinkStreamPoll::_remove_orb(ORB_ID orb_id, int interval_ms) +MavlinkStreamPoll::register_poll(uint16_t stream_id, uint32_t interval_ms) { - // Decrement usage/remove the item from the orbs list - for (int i = 0; i < _count; i++) { - if (_orbs[i].orb_id == orb_id && - _orbs[i].interval == interval_ms) { - _orbs[i].usage_count--; - - // If this was the last request for the orb, - // unsubscribe and remove from orbs list - if (_orbs[i].usage_count == 0) { - orb_unsubscribe(_orbs[i].fd); - // Replace the current item with - // the last item in the _orbs list. - // Loop counting is not disturbed because - // we break out from loop after this. - _orbs[i] = _orbs[--_count]; - } - - break; - } - } - - return PX4_OK; -} -/** - * Register stream object to poll list of orbs - */ -int -MavlinkStreamPoll::register_orbs(uint16_t stream_id, ORB_ID *orbs, int cnt) -{ - if (!orbs || cnt <= 0) { - return PX4_OK; - } + MavStreamPollReq *req = new MavStreamPollReq(stream_id); pthread_mutex_lock(&_mtx); - - for (int i = 0; i < cnt; i++) { - - // Increase reqs capacity if necessary - while (_reqs_count >= _reqs_capacity) { - _reqs_capacity = _reqs_capacity + 16; - _reqs = (MavStreamOrbPollReq *)realloc(_reqs, _reqs_capacity * sizeof(MavStreamOrbPollReq)); - - if (!_reqs) { - PX4_ERR("failed to allocate memory for orb poll reqs"); - pthread_mutex_unlock(&_mtx); - return PX4_ERROR; - } - } - - MavStreamOrbPollReq *req = &_reqs[_reqs_count]; - req->stream_id = stream_id; - req->orb_id = orbs[i]; - req->interval = MavlinkStreamDefaultTopicInterval; - _reqs_count++; - - // Update orbs list - _add_orb(orbs[i], req->interval); - } - - // Update fds - for (int i = 0; i < _count; i++) { - _fds[i].fd = _orbs[i].fd; - _fds[i].events = POLLIN; - _fds[i].revents = 0; - } - + _reqs.add(req); pthread_mutex_unlock(&_mtx); - return PX4_OK; + + hrt_call_every(&req->_hrt_req, (hrt_abstime)interval_ms * 1000, + (hrt_abstime)interval_ms * 1000, hrt_callback, &_poll_sem); + return OK; } -/** - * Unregister stream object from orbs polling - */ -int -MavlinkStreamPoll::unregister_orbs(uint16_t stream_id) +void +MavlinkStreamPoll::unregister_poll(uint16_t stream_id) { - int i = 0; pthread_mutex_lock(&_mtx); - while (i < _reqs_count) { - if (_reqs[i].stream_id == stream_id) { - - // Remove orb from the orbs list - _remove_orb(_reqs[i].orb_id, _reqs[i].interval); - - // Finally, replace the current item with - // the last item in the _reqs list - _reqs[i] = _reqs[--_reqs_count]; - - } else { - // Only increment if current _reqs item is not removed. - // Otherwise we have moved the last item to the current - // position, so we need to check the same index again. - i++; + for (auto req : _reqs) { + if (req->_stream_id == stream_id) { + _reqs.remove(req); + hrt_cancel(&req->_hrt_req); + delete (req); + break; } } - // Update fds - for (int j = 0; j < _count; j++) { - _fds[j].fd = _orbs[j].fd; - _fds[j].events = POLLIN; - _fds[j].revents = 0; - } - pthread_mutex_unlock(&_mtx); - return PX4_OK; } -/** - * Set stream update interval to adjust orb polling accordingly - */ int -MavlinkStreamPoll::set_interval(uint16_t stream_id, int interval_ms) +MavlinkStreamPoll::set_interval(uint16_t stream_id, uint32_t interval_ms) { - pthread_mutex_lock(&_mtx); - - // Renew all uorb subscriptions of given stream with new interval value - for (int i = 0; i < _count; i++) { - if (_reqs[i].stream_id == stream_id) { - - // Remove orb_id subscription with current old interval - _remove_orb(_reqs[i].orb_id, _reqs[i].interval); - - // Update interval - _reqs[i].interval = interval_ms; - - // Add orb_id subscription with new interval - _add_orb(_reqs[i].orb_id, _reqs[i].interval); - - } - } - - // Update fds - for (int j = 0; j < _count; j++) { - _fds[j].fd = _orbs[j].fd; - _fds[j].events = POLLIN; - _fds[j].revents = 0; - } - - pthread_mutex_unlock(&_mtx); - return PX4_OK; + unregister_poll(stream_id); + return register_poll(stream_id, interval_ms); } /** @@ -364,27 +205,27 @@ MavlinkStreamPoll::poll(const hrt_abstime timeout) timeout_ms = 1; } - pthread_mutex_lock(&_mtx); + // Wait event for a maximum timeout time - ret = px4_poll(_fds, _count, timeout_ms); + struct timespec to; +#if defined(CONFIG_ARCH_BOARD_PX4_SITL) + px4_clock_gettime(CLOCK_MONOTONIC, &to); +#else + px4_clock_gettime(CLOCK_REALTIME, &to); +#endif + hrt_abstime now = ts_to_abstime(&to); + abstime_to_ts(&to, now + (hrt_abstime)timeout * 1000); - pthread_mutex_unlock(&_mtx); + ret = px4_sem_timedwait(&_poll_sem, &to); return ret; } -/** - * Acknowledge all orb data for next poll - */ void -MavlinkStreamPoll::ack_all() +MavlinkStreamPoll::hrt_callback(void *arg) { - pthread_mutex_lock(&_mtx); + px4_sem_post((px4_sem_t *)arg); +} - for (int i = 0; i < _count; i++) { - orb_ack(_orbs[i].fd); - } - pthread_mutex_unlock(&_mtx); -} #endif /* CONFIG_MAVLINK_UORB_POLL */ diff --git a/src/modules/mavlink/mavlink_stream.h b/src/modules/mavlink/mavlink_stream.h index cefe9bedb4f2..984292f04ced 100644 --- a/src/modules/mavlink/mavlink_stream.h +++ b/src/modules/mavlink/mavlink_stream.h @@ -44,6 +44,7 @@ #include #include #include +#include #include #include @@ -151,21 +152,6 @@ class MavlinkStream : public ListNode /** * Structure of objects in _reqs list */ -struct MavStreamOrbPollReq { - uint16_t stream_id; - ORB_ID orb_id; - int interval; -}; - -/** - * Structure of objects in _orbs list - */ -struct MavStreamPollItem { - ORB_ID orb_id; - int interval; - int usage_count; - orb_sub_t fd; -}; class MavlinkStreamPoll { @@ -176,63 +162,48 @@ class MavlinkStreamPoll /** * Add a stream to the poll list */ - int register_orbs(uint16_t stream_id, ORB_ID *orbs, int cnt); + int register_poll(uint16_t stream_id, uint32_t interval_ms); /** * Remove a stream from the poll list */ - int unregister_orbs(uint16_t stream_id); + void unregister_poll(uint16_t stream_id); /** - * Set stream update interval + * Re-set interval */ - int set_interval(uint16_t stream_id, int interval_ms); + + int set_interval(uint16_t stream_id, uint32_t interval_ms); + /** * Poll all streams for updates */ int poll(const hrt_abstime timeout); - /** - * Acknowledge all orb data for next poll - */ - void ack_all(); - private: - /** - * Add a orb_id/interval pair to the orbs list - */ - int _add_orb(ORB_ID orb_id, int interval_ms); + class MavStreamPollReq : public ListNode + { + public: + MavStreamPollReq(uint16_t stream_id) : _stream_id(stream_id) {} + uint16_t _stream_id; + struct hrt_call _hrt_req; + }; - /** - * Remove a orb_id/interval pair from the orbs list - */ - int _remove_orb(ORB_ID orb_id, int interval_ms); + static void hrt_callback(void *arg); /** - * Poll file descriptors for updates + * Requests from stream objects */ - orb_poll_struct_t *_fds; - /** - * List of different orbs to poll - */ - MavStreamPollItem *_orbs; + List _reqs; /** - * Requests from stream objects, contains orb poll requests - * count and capacity of the requests list + * Signalling semaphore to release the poll */ - MavStreamOrbPollReq *_reqs; - int _reqs_capacity; - int _reqs_count; - /** - * Count and capacity of the orbs/fds lists - */ - int _capacity; - int _count; + px4_sem_t _poll_sem; pthread_mutex_t _mtx {}; };