Skip to content

Commit 68daa49

Browse files
authored
feat(runqueue): Add id for tracking function execution (#401)
* feat(runqueue): Add id for tracking function execution * Update runqueue to generate and store an ID associated with each function, returning the id when functions are added to the queue * Allow querying of the ids currently in the queue or running * Allow removal of functions by id from the queue * Update example to test and showcase new APIs Allows callers to track if their function is currently in the queue or has finished executing (no longer in the queue or running). Also allows callers to remove the function from the queue if it is no longer needed to run. * Build and run `runqueue/example` on QtPy ESP32s3 and ensure it works. * update readme * update example and logging
1 parent ebe651d commit 68daa49

File tree

5 files changed

+236
-12
lines changed

5 files changed

+236
-12
lines changed

components/runqueue/CMakeLists.txt

+3
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@ idf_component_register(
22
INCLUDE_DIRS "include"
33
SRC_DIRS "src"
44
REQUIRES base_component task)
5+
6+
# add compiler flags to enable rtti
7+
target_compile_options(${COMPONENT_LIB} PRIVATE -frtti)

components/runqueue/example/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ See the Getting Started Guide for full steps to configure and use ESP-IDF to bui
2323

2424
## Example Output
2525

26-
![CleanShot 2025-03-06 at 14 38 32](https://github.com/user-attachments/assets/57dbbe6c-dc46-448b-b86f-750a19142658)
26+
![CleanShot 2025-03-07 at 22 13 10@2x](https://github.com/user-attachments/assets/dedff19e-c40f-4451-a0e1-94e2b4a8835f)
27+

components/runqueue/example/main/runqueue_example.cpp

+79-4
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,90 @@ extern "C" void app_main(void) {
4141
auto task0 = std::bind(task, 0);
4242
auto task1 = std::bind(task, 1);
4343
auto task2 = std::bind(task, 2);
44-
runqueue.add_function(
44+
auto task3 = std::bind(task, 3);
45+
auto task0_id = runqueue.add_function(
4546
task0,
4647
espp::RunQueue::MIN_PRIORITY); // this will run first because it was first to be added
47-
runqueue.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run third
48-
runqueue.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run second
49-
runqueue.add_function(stop_task, espp::RunQueue::MIN_PRIORITY); // this will run last
48+
auto task1_id =
49+
runqueue.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run fourth
50+
auto task3_id =
51+
runqueue.add_function(task3, espp::RunQueue::MAX_PRIORITY); // this will run second
52+
auto task2_id =
53+
runqueue.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run third
54+
auto stop_task_id =
55+
runqueue.add_function(stop_task, espp::RunQueue::MIN_PRIORITY); // this will run last
5056
logger.info("All tasks scheduled!");
5157

58+
// print our the task ids
59+
logger.info("Task 0 id: {}", task0_id);
60+
logger.info("Task 1 id: {}", task1_id);
61+
logger.info("Task 2 id: {}", task2_id);
62+
logger.info("Task 3 id: {}", task3_id);
63+
logger.info("Stop task id: {}", stop_task_id);
64+
5265
// check the API for queue_size
5366
logger.info("Queue size: {}", runqueue.queue_size());
5467

5568
// check the API for is_running
5669
logger.info("RunQueue is running: {}", runqueue.is_running());
5770

71+
// check the API for is_function_queued
72+
logger.info("Task 0 is queued: {}", runqueue.is_function_queued(task0_id));
73+
logger.info("Task 1 is queued: {}", runqueue.is_function_queued(task1_id));
74+
logger.info("Task 2 is queued: {}", runqueue.is_function_queued(task2_id));
75+
logger.info("Task 3 is queued: {}", runqueue.is_function_queued(task3_id));
76+
logger.info("Stop task is queued: {}", runqueue.is_function_queued(stop_task_id));
77+
78+
// check the API for removing an invalid id
79+
if (runqueue.remove_function(espp::RunQueue::INVALID_ID)) {
80+
logger.error("Incorrectly Removed invalid id!");
81+
} else {
82+
logger.info("Correctly failed to remove invalid id!");
83+
}
84+
85+
// check the api for removing a valid but non-existent id
86+
if (runqueue.remove_function(999)) {
87+
logger.error("Incorrectly Removed non-existent id!");
88+
} else {
89+
logger.info("Correctly failed to remove non-existent id!");
90+
}
91+
92+
// check the api for removing the currently running id (which should fail)
93+
if (runqueue.remove_function(runqueue.get_running_id().value())) {
94+
logger.error("Incorrectly Removed currently running id!");
95+
} else {
96+
logger.info("Correctly failed to remove currently running id!");
97+
}
98+
99+
// NOTE: in the next example (below) we'll check removing a valid ID
100+
101+
// check the API for get_queued_ids(bool include_running)
102+
auto queued_ids = runqueue.get_queued_ids(true);
103+
logger.info("Queued ids (including running): {}", queued_ids);
104+
105+
// check the API for get_running_id
106+
auto running_id = runqueue.get_running_id();
107+
logger.info("Running id: {}", running_id);
108+
58109
logger.info("Waiting for stop task to complete...");
59110
std::unique_lock lock(done_mutex);
60111
done_cv.wait(lock, [&done] { return done; });
61112

113+
// check the API for get_running_id again (should return nullopt)
114+
running_id = runqueue.get_running_id();
115+
logger.info("Running id: {}", running_id);
116+
117+
// check the api for get_queued_ids again
118+
queued_ids = runqueue.get_queued_ids(true);
119+
logger.info("Queued ids (including running): {}", queued_ids);
120+
121+
// check the api for is_function_queued again
122+
logger.info("Task 0 is queued: {}", runqueue.is_function_queued(task0_id));
123+
logger.info("Task 1 is queued: {}", runqueue.is_function_queued(task1_id));
124+
logger.info("Task 2 is queued: {}", runqueue.is_function_queued(task2_id));
125+
logger.info("Task 3 is queued: {}", runqueue.is_function_queued(task3_id));
126+
logger.info("Stop task is queued: {}", runqueue.is_function_queued(stop_task_id));
127+
62128
logger.info("All tasks done!");
63129
//! [runqueue example]
64130
}
@@ -99,6 +165,8 @@ extern "C" void app_main(void) {
99165
espp::RunQueue::MIN_PRIORITY); // this will run first because it was first to be added
100166
runqueue0.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run third
101167
runqueue0.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run second
168+
auto id_to_remove = runqueue0.add_function(stop_task,
169+
espp::RunQueue::MIN_PRIORITY); // this will run last
102170

103171
runqueue1.add_function(
104172
task0,
@@ -108,6 +176,13 @@ extern "C" void app_main(void) {
108176
runqueue1.add_function(stop_task, espp::RunQueue::MIN_PRIORITY); // this will run last
109177
logger.info("All tasks scheduled!");
110178

179+
// now remove the stop task from runqueue0
180+
if (!runqueue0.remove_function(id_to_remove)) {
181+
logger.error("Failed to remove task from runqueue0!");
182+
} else {
183+
logger.info("Removed task from runqueue0!");
184+
}
185+
111186
logger.info("Waiting for stop task to complete...");
112187
std::unique_lock lock(done_mutex);
113188
done_cv.wait(lock, [&done] { return done; });

components/runqueue/include/runqueue.hpp

+66-4
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,25 @@ class RunQueue : public espp::BaseComponent {
6363
using Priority = std::uint8_t;
6464

6565
/// The minimum priority a function can have.
66-
static constexpr int MIN_PRIORITY = std::numeric_limits<Priority>::min();
66+
static constexpr Priority MIN_PRIORITY = std::numeric_limits<Priority>::min();
6767

6868
/// The maximum priority a function can have.
69-
static constexpr int MAX_PRIORITY = std::numeric_limits<Priority>::max();
69+
static constexpr Priority MAX_PRIORITY = std::numeric_limits<Priority>::max();
70+
71+
/// The type used to represent the id of a function.
72+
using Id = std::uint32_t;
73+
74+
/// The invalid id value. This is used to know if an id is valid or not.
75+
static constexpr Id INVALID_ID = std::numeric_limits<Id>::min();
7076

7177
/// A function that takes no arguments and returns void.
7278
using Function = std::function<void(void)>;
7379
// typedef std::function<void(void)> Function;
7480

7581
/// A pair of a priority and a function.
7682
struct PriorityFunction {
77-
Priority priority; ///< The priority of the function.
83+
Priority priority; ///< The priority of the function. Lower values have lower priority.
84+
Id id; ///< The id of the function. Can be provided or auto-generated.
7885
Function function; ///< The function.
7986
};
8087

@@ -94,6 +101,14 @@ class RunQueue : public espp::BaseComponent {
94101
return lhs.priority > rhs.priority;
95102
}
96103

104+
/// Equality operator for PriorityFunction.
105+
/// \param lhs The left hand side of the comparison.
106+
/// \param rhs The right hand side of the comparison.
107+
/// \return True if the left and right hand side have the same priority and id.
108+
friend bool operator==(const PriorityFunction &lhs, const PriorityFunction &rhs) {
109+
return lhs.priority == rhs.priority && lhs.id == rhs.id;
110+
}
111+
97112
/// Configuration struct for the RunQueue
98113
struct Config {
99114
bool auto_start = true; ///< Whether the RunQueue should start automatically.
@@ -121,12 +136,57 @@ class RunQueue : public espp::BaseComponent {
121136
void start();
122137

123138
/// Stop the run queue.
139+
/// \note This must wait until the currently running function (if any) has
140+
/// completed before stopping the run queue.
124141
void stop();
125142

126143
/// Add a function to the queue.
127144
/// \param function The function to add.
128145
/// \param priority The priority of the function. Defaults to MIN_PRIORITY.
129-
void add_function(const Function &function, Priority priority = MIN_PRIORITY);
146+
/// \return The id of the function. This will be auto-generated and can be
147+
/// used to query or remove the function from the queue.
148+
Id add_function(const Function &function, Priority priority = MIN_PRIORITY);
149+
150+
/// Remove a function from the queue.
151+
/// \param id The id of the function to remove. Cannot be INVALID_ID.
152+
/// \return True if the function was removed.
153+
/// \note This will not remove or stop the currently running function (if
154+
/// any).
155+
/// \note This will return false if the id is INVALID_ID.
156+
/// \note This will return false if the id is the id of the currently running
157+
/// function.
158+
bool remove_function(Id id);
159+
160+
/// Check if a function is queued or running.
161+
/// \param id The id of the function to check. Cannot be INVALID_ID.
162+
/// \return True if the function is queued or is currently running.
163+
/// \note This will return false if the the id is INVALID_ID.
164+
bool is_function_queued(Id id);
165+
166+
/// Remove all functions from the queue.
167+
/// \note This will not remove or stop the currently running function (if
168+
/// any).
169+
void clear_queue();
170+
171+
/// Get the ids of all functions in the queue in order of priority.
172+
/// \param include_running Whether to include the id of the currently running
173+
/// function (if any) in the returned vector.
174+
/// \return A vector of the ids of all functions in the queue, optionally
175+
/// including the id of the currently running function. Maybe empty if
176+
/// there are no functions queued or running.
177+
/// \note The vector will be in order of priority, with the highest priority
178+
/// function at the end of the vector. This means that if include_running
179+
/// is true, the id of the currently running function will be the last
180+
/// element in the vector.
181+
std::vector<Id> get_queued_ids(bool include_running = false);
182+
183+
/// Get the id of the currently running function.
184+
/// \return The id of the currently running function, or std::nullopt if no
185+
/// function is currently running.
186+
/// \note This may return nullopt if the currently running function has
187+
/// completed but the runner task has not yet fetched the next function
188+
/// from the queue.
189+
std::optional<Id> get_running_id();
130190

131191
protected:
132192
/// Manage the run queue.
@@ -140,6 +200,8 @@ class RunQueue : public espp::BaseComponent {
140200
bool task_fn(std::mutex &m, std::condition_variable &cv, bool &task_notified);
141201

142202
std::unique_ptr<espp::Task> runner_;
203+
std::atomic<Id> id_counter_{INVALID_ID};
204+
std::atomic<Id> running_id_{INVALID_ID};
143205
std::mutex queue_mutex_;
144206
std::condition_variable queue_cv_;
145207
bool queue_notified_ = false;

components/runqueue/src/runqueue.cpp

+86-3
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,89 @@ void RunQueue::stop() {
3636
runner_->stop();
3737
}
3838

39-
void RunQueue::add_function(const Function &function, Priority priority) {
40-
logger_.debug("Adding function to queue with priority: {}", priority);
39+
RunQueue::Id RunQueue::add_function(const Function &function, Priority priority) {
40+
// generate an Id for this function
41+
Id id = ++id_counter_; // pre-increment so the first id is 1
42+
if (id == INVALID_ID) {
43+
// if the id is min, then increment again. this should be rare, and only
44+
// happen if the id_counter_ overflows
45+
id = ++id_counter_;
46+
}
4147
std::unique_lock lock(queue_mutex_);
42-
priority_function_queue_.insert({priority, function});
48+
priority_function_queue_.insert({priority, id, function});
4349
// notify the queue so that the runner wakes up and runs the function
4450
queue_cv_.notify_all();
51+
return id;
52+
}
53+
54+
bool RunQueue::remove_function(RunQueue::Id id) {
55+
// return false if the id is invalid
56+
if (id == INVALID_ID) {
57+
logger_.debug("Cannot remove function with id: {}, it is invalid", id);
58+
return false;
59+
}
60+
// return false if the function is the one currently running, as we cannot
61+
// remove the running function
62+
if (id == running_id_) {
63+
logger_.debug("Cannot remove function with id: {}, it is running", id);
64+
return false;
65+
}
66+
logger_.debug("Removing function from queue with id: {}", id);
67+
std::unique_lock lock(queue_mutex_);
68+
auto it = std::find_if(priority_function_queue_.begin(), priority_function_queue_.end(),
69+
[&](const auto &pf) { return pf.id == id; });
70+
if (it != priority_function_queue_.end()) {
71+
priority_function_queue_.erase(it);
72+
return true;
73+
}
74+
logger_.debug("Function with id: {} not found in queue", id);
75+
return false;
76+
}
77+
78+
bool RunQueue::is_function_queued(RunQueue::Id id) {
79+
// return false if the id is invalid
80+
if (id == INVALID_ID) {
81+
return false;
82+
}
83+
// return true if the function is the one currently running
84+
if (id == running_id_) {
85+
return true;
86+
}
87+
// otherwise, check if the function is in the queue
88+
std::unique_lock lock(queue_mutex_);
89+
auto it = std::find_if(priority_function_queue_.begin(), priority_function_queue_.end(),
90+
[&](const auto &pf) { return pf.id == id; });
91+
// return whether or not the function was found
92+
return it != priority_function_queue_.end();
93+
}
94+
95+
void RunQueue::clear_queue() {
96+
logger_.debug("Clearing queue");
97+
std::unique_lock lock(queue_mutex_);
98+
priority_function_queue_.clear();
99+
}
100+
101+
std::vector<RunQueue::Id> RunQueue::get_queued_ids(bool include_running) {
102+
std::vector<Id> ids;
103+
// Note: the vector will be in order of priority, with the highest priority
104+
// function at the end of the vector
105+
{
106+
std::unique_lock lock(queue_mutex_);
107+
for (const auto &pf : priority_function_queue_) {
108+
ids.push_back(pf.id);
109+
}
110+
}
111+
if (include_running && running_id_ != INVALID_ID) {
112+
ids.push_back(running_id_);
113+
}
114+
return ids;
115+
}
116+
117+
std::optional<RunQueue::Id> RunQueue::get_running_id() {
118+
if (running_id_ != INVALID_ID) {
119+
return running_id_;
120+
}
121+
return std::nullopt;
45122
}
46123

47124
bool RunQueue::manage_queue() {
@@ -62,10 +139,16 @@ bool RunQueue::manage_queue() {
62139
priority_function_queue_.erase(std::next(top).base());
63140
}
64141

142+
// set the running id
143+
running_id_ = highest_priority_function.id;
144+
65145
logger_.debug("Running function with priority: {}", highest_priority_function.priority);
66146
// run the function
67147
highest_priority_function.function();
68148

149+
// clear the running id
150+
running_id_ = INVALID_ID;
151+
69152
// return whether or not there are more functions in the queue
70153
std::unique_lock lock(queue_mutex_);
71154
return !priority_function_queue_.empty();

0 commit comments

Comments
 (0)