Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uorb: Move global _callback_ptr to per thread callback pointer #505

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion platforms/common/uORB/uORBDeviceNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ uORB::DeviceNode::write(const char *buffer, const orb_metadata *meta, orb_advert
#ifdef CONFIG_BUILD_FLAT
item->subscriber->call();
#else
Manager::queueCallback(item->subscriber);
Manager::queueCallback(item->subscriber, item->lock);
#endif
}

Expand Down
10 changes: 1 addition & 9 deletions platforms/common/uORB/uORBManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,12 @@ uORB::Manager::Manager()
PX4_DEBUG("SEM INIT FAIL: ret %d", ret);
}

ret = px4_sem_init(&_callback_lock, 1, 1);

if (ret != 0) {
PX4_DEBUG("SEM INIT FAIL: ret %d", ret);
}

g_sem_pool.init();
}

uORB::Manager::~Manager()
{
px4_sem_destroy(&_lock);
px4_sem_destroy(&_callback_lock);
}

int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance)
Expand Down Expand Up @@ -493,8 +486,7 @@ uORB::Manager::callback_thread(int argc, char *argv[])
while (true) {
lockThread(per_process_lock);

SubscriptionCallback *sub = _Instance->_callback_ptr;
_Instance->unlock_callbacks();
SubscriptionCallback *sub = dequeueCallback(per_process_lock);

// Pass nullptr to this thread to exit
if (sub == nullptr) {
Expand Down
45 changes: 25 additions & 20 deletions platforms/common/uORB/uORBManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,20 @@ class Manager

static int8_t getThreadLock() {return _Instance->g_sem_pool.reserve();}

static void queueCallback(class SubscriptionCallback *sub)
static void queueCallback(class SubscriptionCallback *sub, int idx)
{
_Instance->lock_callbacks();
_Instance->_callback_ptr = sub;
_Instance->g_sem_pool.cb_lock(idx);
_Instance->g_sem_pool.cb_set(idx, sub);
// The manager is unlocked in callback thread
}

static class SubscriptionCallback *dequeueCallback(int idx)
{
class SubscriptionCallback *sub = _Instance->g_sem_pool.cb_get(idx);
_Instance->g_sem_pool.cb_unlock(idx);
return sub;
}

static bool isThreadAlive(int idx)
{
int value = _Instance->g_sem_pool.value(idx);
Expand Down Expand Up @@ -616,17 +623,6 @@ class Manager
// Global cache for advertised uORB node instances
uint16_t g_has_publisher[ORB_TOPICS_COUNT + 1];

// This (system global) variable is used to pass the subsriber
// pointer to the callback thread. This is in Manager, since
// it needs to be mapped for both advertisers and the subscribers
class SubscriptionCallback *_callback_ptr {nullptr};

// This mutex protects the above pointer for one writer at a time
px4_sem_t _callback_lock;

void lock_callbacks() { do {} while (px4_sem_wait(&_callback_lock) != 0); }
void unlock_callbacks() { px4_sem_post(&_callback_lock); }

// A global pool of semaphores for
// 1) poll locks
// 2) callback thread signalling (except in NuttX flat build)
Expand All @@ -645,14 +641,21 @@ class Manager
void release(int8_t i) {_global_sem[i].release(); }
int value(int8_t i) { return _global_sem[i].value(); }

void cb_lock(int8_t i) { do {} while (_global_sem[i].cb_lock() != 0); }
void cb_unlock(int8_t i) { _global_sem[i].cb_unlock(); }
void cb_set(int8_t i, struct SubscriptionCallback *callback_ptr) { _global_sem[i].cb_set(callback_ptr); }
struct SubscriptionCallback *cb_get(int8_t i) { return _global_sem[i].cb_get(); }

class GlobalLock
{
public:
void init()
{
px4_sem_init(&_sem, 1, 0);
px4_sem_init(&_lock, 1, 1);
#if __PX4_NUTTX
sem_setprotocol(&_sem, SEM_PRIO_NONE);
sem_setprotocol(&_lock, SEM_PRIO_NONE);
#endif
in_use = false;
}
Expand All @@ -670,16 +673,18 @@ class Manager
int value() { int value; px4_sem_getvalue(&_sem, &value); return value; }
bool in_use{false};

int cb_lock() { return px4_sem_wait(&_lock); }
void cb_unlock() { px4_sem_post(&_lock); }
void cb_set(struct SubscriptionCallback *callback_ptr) { _callback_ptr = callback_ptr; }
struct SubscriptionCallback *cb_get() { return _callback_ptr; }
private:
struct SubscriptionCallback *subscriber;
px4_sem_t _sem;
struct SubscriptionCallback *_callback_ptr {nullptr};
px4_sem_t _sem; /* For signaling to the callback thread */
px4_sem_t _lock; /* For signaling back from the callback thread */
};
private:

void lock()
{
do {} while (px4_sem_wait(&_semLock) != 0);
}
void lock() { do {} while (px4_sem_wait(&_semLock) != 0); }
void unlock() { px4_sem_post(&_semLock); }

GlobalLock _global_sem[NUM_GLOBAL_SEMS];
Expand Down