Skip to content

Commit

Permalink
Reimplement TaskQueueProducer with std::queue
Browse files Browse the repository at this point in the history
Old implementation would break if C++ objects were transmitted
through it.
  • Loading branch information
mairas committed Oct 7, 2024
1 parent 1ee5207 commit c11a10f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/sensesp/signalk/signalk_ws_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class SKWSClient : virtual public FileSystemSaveable,
SKDeltaQueue* sk_delta_queue_;
/// @brief Emits the number of deltas sent since last report
TaskQueueProducer<int> delta_tx_tick_producer_ =
TaskQueueProducer<int>(0, event_loop(), 5, 990);
TaskQueueProducer<int>(0, event_loop(), 990);
Integrator<int, int> delta_tx_count_producer_{1, 0, ""};
Integrator<int, int> delta_rx_count_producer_{1, 0, ""};

Expand Down
128 changes: 95 additions & 33 deletions src/sensesp/system/task_queue_producer.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,67 @@
#ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
#define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_

#include <limits>
#include <queue>

#include "ReactESP.h"
#include "observablevalue.h"
#include "sensesp_base_app.h"

namespace sensesp {

/**
* @brief Thread-safe queue for inter-task communication. Works like std::queue.
*
* @tparam T
*/
template <typename T>
class SafeQueue : public std::queue<T> {
public:
SafeQueue() : std::queue<T>() {
queue_semaphore_ =
xSemaphoreCreateCounting(std::numeric_limits<int>::max(), 0);
write_lock_ = xSemaphoreCreateMutex();
}

void push(const T& value) {
xSemaphoreTake(write_lock_, portMAX_DELAY);
std::queue<T>::push(value);
xSemaphoreGive(queue_semaphore_);
xSemaphoreGive(write_lock_);
}

bool pop(T& value, unsigned int max_duration_ms) {
if (xSemaphoreTake(queue_semaphore_,
max_duration_ms / portTICK_PERIOD_MS) == pdTRUE) {
xSemaphoreTake(write_lock_, portMAX_DELAY);
value = std::queue<T>::front();
std::queue<T>::pop();
xSemaphoreGive(write_lock_);
return true;
}
return false;
}

bool empty() {
xSemaphoreTake(write_lock_, portMAX_DELAY);
bool result = std::queue<T>::empty();
xSemaphoreGive(write_lock_);
return result;
}

size_t size() {
xSemaphoreTake(write_lock_, portMAX_DELAY);
size_t result = std::queue<T>::size();
xSemaphoreGive(write_lock_);
return result;
}

protected:
SemaphoreHandle_t queue_semaphore_; // Mirrors the items in the queue
SemaphoreHandle_t write_lock_; // Lock for writing to the queue
};

/**
* @brief Producer class that works across task boundaries.
*
Expand All @@ -16,54 +71,61 @@ namespace sensesp {
* in another.
*
* @tparam T
* @param consumer_app The app object in which the values should be consumed.
* @param queue_size Size of the queue.
* @param poll_rate How often to poll the queue. Note: in microseconds!
* @param consumer_event_loop The event loop in which the values should be
* consumed.
* @param poll_rate How often to poll the queue. Note: in microseconds! A value
* of 0 means that the queue will be polled on every tick.
*/
template <class T>
class TaskQueueProducer : public ObservableValue<T> {
public:
TaskQueueProducer(const T& value,
reactesp::EventLoop* consumer_app = event_loop(),
int queue_size = 1, unsigned int poll_rate = 990)
: ObservableValue<T>(value), queue_size_{queue_size} {
queue_ = xQueueCreate(queue_size, sizeof(T));
if (queue_ == NULL) {
ESP_LOGE(__FILENAME__, "Failed to create queue");
}

// Create a repeat event that will poll the queue and emit the values
consumer_app->onRepeatMicros(poll_rate, [this]() {
TaskQueueProducer(const T& value, reactesp::EventLoop* consumer_event_loop,
unsigned int poll_rate = 990)
: ObservableValue<T>(value) {
auto func = [this]() {
T value;
while (xQueueReceive(queue_, &value, 0) == pdTRUE) {
while (queue_.pop(value, 0)) {
this->emit(value);
}
});
};

// Create a repeat event that will poll the queue and emit the values
if (poll_rate == 0) {
consumer_event_loop->onTick(func);
} else {
consumer_event_loop->onRepeatMicros(poll_rate, func);
}
}

TaskQueueProducer(const T& value, int queue_size = 1,
unsigned int poll_rate = 990)
: TaskQueueProducer(value, event_loop(), queue_size,
poll_rate) {}
TaskQueueProducer(const T& value, unsigned int poll_rate = 990)
: TaskQueueProducer(value, event_loop(), poll_rate) {}

virtual void set(const T& value) override {
// WARNING: This does not check if the queue is full.
xQueueSend(queue_, &value, 0);
}
virtual void set(const T& value) override { queue_.push(value); }

int push(const T& value) {
int retval;
if (queue_size_ == 1) {
retval = xQueueOverwrite(queue_, &value);
} else {
retval = xQueueSend(queue_, &value, 0);
/**
* @brief Wait for a value to be available in the queue.
*
* This function will block until a value is available in the queue. When a
* value becomes available, it will be returned in the reference and
* emitted to the observers.
*
* @param value Received value if the function returns true.
* @param max_duration_ms Maximum duration to wait for the value.
* @return true Value was received successfully.
* @return false
*/
bool wait(T& value, unsigned int max_duration_ms) {
T received_value;
bool result = queue_.pop(received_value, max_duration_ms);
if (result) {
value = received_value;
this->emit(value);
}
return retval;
return result;
}

private:
int queue_size_;
QueueHandle_t queue_;
SafeQueue<T> queue_;
};

} // namespace sensesp
Expand Down

0 comments on commit c11a10f

Please sign in to comment.