Skip to content

Commit 10f450a

Browse files
authored
feat(runqueue): Add espp::RunQueue component (#397)
* feat(runqueue): Add `espp::RunQueue` component * Add new `RunQueue` component for asynchronously running functions in a separate thread without having to allocate separate thread objects and with support for prioritization The RunQueue helps minimize wasted heap when you need to trigger many different functions to run asynchronously and sporadically - preventing you from having to allocate different tasks statically for them, when you are OK with prioritized / best-effort execution of the functions. Build and run the new `runqueue/example` on QtPy ESP32s3 * add screenshot to readme
1 parent 627dd81 commit 10f450a

13 files changed

+458
-1
lines changed

.github/workflows/build.yml

+2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ jobs:
125125
target: esp32s3
126126
- path: 'components/rtsp/example'
127127
target: esp32
128+
- path: 'components/runqueue/example'
129+
target: esp32
128130
- path: 'components/seeed-studio-round-display/example'
129131
target: esp32s3
130132
- path: 'components/serialization/example'

components/runqueue/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
idf_component_register(
2+
INCLUDE_DIRS "include"
3+
SRC_DIRS "src"
4+
REQUIRES base_component task)
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# The following lines of boilerplate have to be in your project's CMakeLists
2+
# in this exact order for cmake to work correctly
3+
cmake_minimum_required(VERSION 3.5)
4+
5+
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
6+
7+
# add the component directories that we want to use
8+
set(EXTRA_COMPONENT_DIRS
9+
"../../../components/"
10+
)
11+
12+
set(
13+
COMPONENTS
14+
"main esptool_py runqueue"
15+
CACHE STRING
16+
"List of components to include"
17+
)
18+
19+
project(runqueue_example)
20+
21+
set(CMAKE_CXX_STANDARD 20)

components/runqueue/example/README.md

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# RunQueue Example
2+
3+
This example shows how you can use the `espp::RunQueue` to schedule functions to
4+
run asynchronously with priority ordering. You can configure the RunQueue task
5+
so that you have different RunQueue tasks for different groups of function
6+
priorities as well as so that you can have a RunQueue on each core if you like.
7+
8+
## How to use example
9+
10+
### Build and Flash
11+
12+
Build the project and flash it to the board, then run monitor tool to view serial output:
13+
14+
```
15+
idf.py -p PORT flash monitor
16+
```
17+
18+
(Replace PORT with the name of the serial port to use.)
19+
20+
(To exit the serial monitor, type ``Ctrl-]``.)
21+
22+
See the Getting Started Guide for full steps to configure and use ESP-IDF to build projects.
23+
24+
## Example Output
25+
26+
![CleanShot 2025-03-06 at 14 38 32](https://github.com/user-attachments/assets/57dbbe6c-dc46-448b-b86f-750a19142658)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
idf_component_register(SRC_DIRS "."
2+
INCLUDE_DIRS ".")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#include <chrono>
2+
#include <vector>
3+
4+
#include "logger.hpp"
5+
#include "runqueue.hpp"
6+
7+
using namespace std::chrono_literals;
8+
9+
extern "C" void app_main(void) {
10+
espp::Logger logger({.tag = "RunQueue Example", .level = espp::Logger::Verbosity::DEBUG});
11+
12+
{
13+
logger.info("Basic runqueue example!");
14+
//! [runqueue example]
15+
espp::RunQueue runqueue({.log_level = espp::Logger::Verbosity::DEBUG});
16+
17+
// make some functions to run and schedule them with different priorities to
18+
// show the priority queue works
19+
20+
std::mutex done_mutex;
21+
std::condition_variable done_cv;
22+
bool done = false;
23+
24+
auto task = [&logger](int id) {
25+
auto core = xPortGetCoreID();
26+
logger.info("Task {} running on core {}", id, core);
27+
std::this_thread::sleep_for(1s);
28+
logger.info("Task {} done on core {}", id, core);
29+
};
30+
31+
auto stop_task = [&]() {
32+
logger.info("stop task running!");
33+
std::this_thread::sleep_for(1s);
34+
logger.info("Stop task done!");
35+
std::unique_lock lock(done_mutex);
36+
done = true;
37+
done_cv.notify_one();
38+
};
39+
40+
logger.info("Scheduling tasks...");
41+
auto task0 = std::bind(task, 0);
42+
auto task1 = std::bind(task, 1);
43+
auto task2 = std::bind(task, 2);
44+
runqueue.add_function(
45+
task0,
46+
espp::RunQueue::MIN_PRIORITY); // this will run first because it was first to be added
47+
runqueue.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run third
48+
runqueue.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run second
49+
runqueue.add_function(stop_task, espp::RunQueue::MIN_PRIORITY); // this will run last
50+
logger.info("All tasks scheduled!");
51+
52+
// check the API for queue_size
53+
logger.info("Queue size: {}", runqueue.queue_size());
54+
55+
// check the API for is_running
56+
logger.info("RunQueue is running: {}", runqueue.is_running());
57+
58+
logger.info("Waiting for stop task to complete...");
59+
std::unique_lock lock(done_mutex);
60+
done_cv.wait(lock, [&done] { return done; });
61+
62+
logger.info("All tasks done!");
63+
//! [runqueue example]
64+
}
65+
66+
{
67+
logger.info("Multiple runqueue example (on different cores)!");
68+
//! [multiple runqueue example]
69+
std::mutex done_mutex;
70+
std::condition_variable done_cv;
71+
bool done = false;
72+
73+
auto task = [&logger](int id) {
74+
auto core = xPortGetCoreID();
75+
logger.info("Task {} running on core {}", id, core);
76+
std::this_thread::sleep_for(1s);
77+
logger.info("Task {} done on core {}", id, core);
78+
};
79+
80+
auto stop_task = [&]() {
81+
auto core = xPortGetCoreID();
82+
logger.info("stop task running on core {}", core);
83+
std::this_thread::sleep_for(1s);
84+
logger.info("Stop task done on core {}", core);
85+
std::unique_lock lock(done_mutex);
86+
done = true;
87+
done_cv.notify_one();
88+
};
89+
90+
espp::RunQueue runqueue0({.task_config = {.name = "core0 runq", .core_id = 0}});
91+
espp::RunQueue runqueue1({.task_config = {.name = "core1 runq", .core_id = 1}});
92+
93+
logger.info("Scheduling tasks...");
94+
auto task0 = std::bind(task, 0);
95+
auto task1 = std::bind(task, 1);
96+
auto task2 = std::bind(task, 2);
97+
runqueue0.add_function(
98+
task0,
99+
espp::RunQueue::MIN_PRIORITY); // this will run first because it was first to be added
100+
runqueue0.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run third
101+
runqueue0.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run second
102+
103+
runqueue1.add_function(
104+
task0,
105+
espp::RunQueue::MIN_PRIORITY); // this will run first because it was first to be added
106+
runqueue1.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run third
107+
runqueue1.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run second
108+
runqueue1.add_function(stop_task, espp::RunQueue::MIN_PRIORITY); // this will run last
109+
logger.info("All tasks scheduled!");
110+
111+
logger.info("Waiting for stop task to complete...");
112+
std::unique_lock lock(done_mutex);
113+
done_cv.wait(lock, [&done] { return done; });
114+
//! [multiple runqueue example]
115+
}
116+
117+
logger.info("Example complete!");
118+
119+
while (true) {
120+
std::this_thread::sleep_for(1s);
121+
}
122+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Common ESP-related
2+
#
3+
CONFIG_ESP_SYSTEM_EVENT_TASK_STACK_SIZE=4096
4+
CONFIG_ESP_MAIN_TASK_STACK_SIZE=8192
+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#pragma once
2+
3+
#include <deque>
4+
#include <functional>
5+
#include <mutex>
6+
#include <set>
7+
#include <utility>
8+
9+
#include "base_component.hpp"
10+
#include "task.hpp"
11+
12+
namespace espp {
13+
/// This class implements a run queue for generic functions.
14+
///
15+
/// The functions can be added to the queue with a priority. The functions are
16+
/// executed in the order of their priority, but the currently executing
17+
/// function will run to completion before the next function is executed.
18+
///
19+
/// The RunQueue is implemented as a Task that runs the functions in the queue.
20+
/// The Task will run the highest priority function in the queue (if any) and
21+
/// will block indefinitely until either either a new function is added to the
22+
/// queue or the RunQueue is stopped.
23+
///
24+
/// \note Function priorities are relative to each other and are only compared
25+
/// to other tasks in the specific RunQueue object. Different RunQueue
26+
/// objects may be active at the same time and the priorities of
27+
/// functions in one RunQueue are not compared to the priorities of
28+
/// functions in another. Similarly, the priorities of functions is not
29+
/// taken into account in the FreeRTOS scheduler - all functions in a
30+
/// given RunQueue will run with the same task priority as the
31+
/// RunQueue's task.
32+
///
33+
/// Because the RunQueue is implemented as an espp::Task, you can customize
34+
/// its task configuration, such as priority, stack size, core id, etc.
35+
///
36+
/// \note All functions within a RunQueue share the same task context, so you
37+
/// should set the stack size for the RunQueue accordingly.
38+
///
39+
/// The RunQueue can be configured to start automatically or manually. If the
40+
/// RunQueue is configured to start automatically, it will start when it is
41+
/// constructed. If it is configured to start manually, it will not start until
42+
/// the start() method is called.
43+
///
44+
/// The RunQueue is thread-safe and can be used from multiple threads.
45+
///
46+
/// \warn Care should be taken to ensure that no functions in the queue are
47+
/// blocking on each other, and all functions in the queue must return
48+
/// (they cannot be infinite loops).
49+
///
50+
/// \warn Functions take a long time to run may delay or prevent the execution
51+
/// of other functions in the queue. For this reason it's recommended to
52+
/// try to keep the functions as short-lived as possible, and to
53+
/// minimize the priorities of any functions which take longer to
54+
/// execute.
55+
///
56+
/// \section runq_ex0 RunQueue Example
57+
/// \snippet runqueue_example.cpp runqueue example
58+
/// \section runq_ex1 Multiple RunQueues Example
59+
/// \snippet runqueue_example.cpp multiple runqueues example
60+
class RunQueue : public espp::BaseComponent {
61+
public:
62+
/// The type used to represent the priority of a function.
63+
using Priority = std::uint8_t;
64+
65+
/// The minimum priority a function can have.
66+
static constexpr int MIN_PRIORITY = std::numeric_limits<Priority>::min();
67+
68+
/// The maximum priority a function can have.
69+
static constexpr int MAX_PRIORITY = std::numeric_limits<Priority>::max();
70+
71+
/// A function that takes no arguments and returns void.
72+
using Function = std::function<void(void)>;
73+
// typedef std::function<void(void)> Function;
74+
75+
/// A pair of a priority and a function.
76+
struct PriorityFunction {
77+
Priority priority; ///< The priority of the function.
78+
Function function; ///< The function.
79+
};
80+
81+
/// Less than operator for PriorityFunction.
82+
/// \param lhs The left hand side of the comparison.
83+
/// \param rhs The right hand side of the comparison.
84+
/// \return True if the right hand side has a greater priority.
85+
friend bool operator<(const PriorityFunction &lhs, const PriorityFunction &rhs) {
86+
return lhs.priority < rhs.priority;
87+
}
88+
89+
/// Greater than operator for PriorityFunction.
90+
/// \param lhs The left hand side of the comparison.
91+
/// \param rhs The right hand side of the comparison.
92+
/// \return True if the left hand side has a greater priority.
93+
friend bool operator>(const PriorityFunction &lhs, const PriorityFunction &rhs) {
94+
return lhs.priority > rhs.priority;
95+
}
96+
97+
/// Configuration struct for the RunQueue
98+
struct Config {
99+
bool auto_start = true; ///< Whether the RunQueue should start automatically.
100+
espp::Task::BaseConfig task_config = {}; ///< The configuration for the runner task.
101+
espp::Logger::Verbosity log_level =
102+
espp::Logger::Verbosity::WARN; ///< The log level for the RunQueue.
103+
};
104+
105+
/// Construct a RunQueue.
106+
/// \param config The configuration for the RunQueue.
107+
explicit RunQueue(const Config &config);
108+
109+
/// Destroy the RunQueue.
110+
~RunQueue();
111+
112+
/// Get the number of functions in the queue.
113+
/// \return The number of functions in the queue.
114+
std::size_t queue_size() const;
115+
116+
/// Get whether the run queue is running.
117+
/// \return True if the run queue is running.
118+
bool is_running() const;
119+
120+
/// Start the run queue.
121+
void start();
122+
123+
/// Stop the run queue.
124+
void stop();
125+
126+
/// Add a function to the queue.
127+
/// \param function The function to add.
128+
/// \param priority The priority of the function. Defaults to MIN_PRIORITY.
129+
void add_function(const Function &function, Priority priority = MIN_PRIORITY);
130+
131+
protected:
132+
/// Manage the run queue.
133+
/// \details This function is called by the runner task to manage the run
134+
/// queue. It will run the highest priority function in the queue
135+
/// (if any) and will return true if there are more functions to
136+
/// run.
137+
/// \return True if there are more functions to run.
138+
bool manage_queue();
139+
140+
bool task_fn(std::mutex &m, std::condition_variable &cv, bool &task_notified);
141+
142+
std::unique_ptr<espp::Task> runner_;
143+
std::mutex queue_mutex_;
144+
std::condition_variable queue_cv_;
145+
bool queue_notified_ = false;
146+
std::multiset<PriorityFunction> priority_function_queue_;
147+
}; // class RunQueue
148+
} // namespace espp

0 commit comments

Comments
 (0)