Skip to content

Commit

Permalink
core/thread_flags: add thread_flags_wait_any_or_mbox()
Browse files Browse the repository at this point in the history
The new function blocks until either any of given set of flags is set
or a message was fetch from the given mbox - whatever comes first.
  • Loading branch information
maribu committed Nov 25, 2022
1 parent f1a8e1f commit e5ce3d5
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 6 deletions.
2 changes: 1 addition & 1 deletion core/include/mbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
#ifndef MBOX_H
#define MBOX_H

#include "list.h"
#include "cib.h"
#include "list.h"
#include "msg.h"

#ifdef __cplusplus
Expand Down
1 change: 1 addition & 0 deletions core/include/sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ typedef enum {
STATUS_REPLY_BLOCKED, /**< waiting for a message response */
STATUS_FLAG_BLOCKED_ANY, /**< waiting for any flag from flag_mask */
STATUS_FLAG_BLOCKED_ALL, /**< waiting for all flags in flag_mask */
STATUS_FLAG_MBOX_BLOCKED, /**< waiting for any flag or an mbox msg */
STATUS_MBOX_BLOCKED, /**< waiting for get/put on mbox */
STATUS_COND_BLOCKED, /**< waiting for a condition variable */
STATUS_RUNNING, /**< currently running */
Expand Down
21 changes: 21 additions & 0 deletions core/include/thread_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#define THREAD_FLAGS_H

#include "sched.h" /* for thread_t typedef */
#include "mbox.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -101,6 +102,10 @@ extern "C" {
* @see xtimer_set_timeout_flag
*/
#define THREAD_FLAG_TIMEOUT (1u << 14)
/**
* @brief Flag used to wake @ref thread_flags_wait_any_or_mbox
*/
#define THREAD_FLAG_MBOX_READY (1u << 13)

/**
* @brief Comprehensive set of all predefined flags
Expand Down Expand Up @@ -170,6 +175,22 @@ thread_flags_t thread_flags_wait_any(thread_flags_t mask);
*/
thread_flags_t thread_flags_wait_all(thread_flags_t mask);

/**
* @brief Wait for any flag or a message via mbox, whatever comes first
*
* @param[in,out] mbox ptr to the mailbox to operate on
* @param[out] msg destination where to store the retrieved message
* @param[in] mask thread_flags mask to wait on
*
* @retval 0 A message came in first
* @retval != A flag came in first
* @return The flags that caused the wake up, or `0` if a message came in first
*
* @note This is only available when module @ref core_mbox is used
*/
thread_flags_t thread_flags_wait_any_or_mbox(mbox_t *mbox, msg_t *msg,
thread_flags_t mask);

/**
* @brief Wait for any flags in mask to become set (blocking), one at a time
*
Expand Down
32 changes: 27 additions & 5 deletions core/mbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

#include <string.h>

#include "mbox.h"
#include "irq.h"
#include "mbox.h"
#include "sched.h"
#include "thread.h"
#include "thread_flags.h"

#define ENABLE_DEBUG 0
#include "debug.h"
Expand Down Expand Up @@ -66,10 +67,31 @@ int _mbox_put(mbox_t *mbox, msg_t *msg, int blocking)
if (next) {
DEBUG("mbox: Thread %" PRIkernel_pid " mbox 0x%08x: _tryput(): "
"there's a waiter.\n", thread_getpid(), (unsigned)mbox);
thread_t *thread =
container_of((clist_node_t *)next, thread_t, rq_entry);
*(msg_t *)thread->wait_data = *msg;
_wake_waiter(thread, irqstate);
thread_t *thread = container_of((clist_node_t *)next, thread_t,
rq_entry);
if (!IS_USED(MODULE_CORE_THREAD_FLAGS) ||
(thread->status == STATUS_MBOX_BLOCKED)) {
*(msg_t *)thread->wait_data = *msg;
_wake_waiter(thread, irqstate);
}
else {
while (cib_full(&mbox->cib)) {
if (!blocking) {
/* put thread back into list and give up */
thread_add_to_list(&mbox->readers, thread);
irq_restore(irqstate);
return 0;
}

_wait(&mbox->writers, irqstate);
irqstate = irq_disable();
}

/* copy message to queue and set flags */
mbox->msg_array[cib_put_unsafe(&mbox->cib)] = *msg;
irq_restore(irqstate);
thread_flags_set(thread, THREAD_FLAG_MBOX_READY);
}
return 1;
}
else {
Expand Down
1 change: 1 addition & 0 deletions core/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ static const char *state_names[STATUS_NUMOF] = {
[STATUS_REPLY_BLOCKED] = "bl reply",
[STATUS_FLAG_BLOCKED_ANY] = "bl anyfl",
[STATUS_FLAG_BLOCKED_ALL] = "bl allfl",
[STATUS_FLAG_MBOX_BLOCKED] = "bl fl|mbox",
[STATUS_MBOX_BLOCKED] = "bl mbox",
[STATUS_COND_BLOCKED] = "bl cond",
[STATUS_RUNNING] = "running",
Expand Down
55 changes: 55 additions & 0 deletions core/thread_flags.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@

#include "thread_flags.h"
#include "irq.h"
#include "list.h"
#include "sched.h"
#include "thread.h"

#define ENABLE_DEBUG 0
#include "debug.h"

struct flags_any_or_mbox_data {
mbox_t *mbox;
thread_flags_t flags;
};

static inline int __attribute__((always_inline)) _thread_flags_wake(
thread_t *thread)
{
Expand All @@ -38,6 +45,18 @@ static inline int __attribute__((always_inline)) _thread_flags_wake(
case STATUS_FLAG_BLOCKED_ALL:
wakeup = ((thread->flags & mask) == mask);
break;
#ifdef MODULE_CORE_MBOX
case STATUS_FLAG_MBOX_BLOCKED:
{
struct flags_any_or_mbox_data *data = thread->wait_data;
wakeup = thread->flags & data->flags;
if (wakeup) {
list_remove(&data->mbox->readers,
(list_node_t *)&thread->rq_entry);
}
}
break;
#endif
default:
wakeup = 0;
break;
Expand Down Expand Up @@ -157,3 +176,39 @@ void thread_flags_set(thread_t *thread, thread_flags_t mask)
irq_restore(state);
}
}

#ifdef MODULE_CORE_MBOX
thread_flags_t thread_flags_wait_any_or_mbox(mbox_t *mbox, msg_t *msg,
thread_flags_t mask)
{
thread_t *me = thread_get_active();
thread_flags_t flags;

struct flags_any_or_mbox_data wait_data = {
.mbox = mbox,
.flags = mask | THREAD_FLAG_MBOX_READY,
};

while (1) {
if (mbox_try_get(mbox, msg)) {
thread_flags_clear(THREAD_FLAG_MBOX_READY);
return 0;
}

flags = _thread_flags_clear_atomic(me, mask);
if (flags) {
return flags;
}

unsigned irqstate = irq_disable();
me->wait_data = &wait_data;
sched_set_status(me, STATUS_FLAG_MBOX_BLOCKED);
thread_add_to_list(&mbox->readers, me);
irq_restore(irqstate);
thread_yield_higher();
irqstate = irq_disable();
sched_set_status(me, STATUS_RUNNING);
irq_restore(irqstate);
}
}
#endif

0 comments on commit e5ce3d5

Please sign in to comment.