diff --git a/src/sensesp/net/ws_client.h b/src/sensesp/net/ws_client.h index b28e62c1c..6ab301131 100644 --- a/src/sensesp/net/ws_client.h +++ b/src/sensesp/net/ws_client.h @@ -90,7 +90,7 @@ class WSClient : public Configurable, bool token_test_success_ = false; TaskQueueProducer connection_state_ = - TaskQueueProducer(WSConnectionState::kWSDisconnected); + TaskQueueProducer(WSConnectionState::kWSDisconnected, ReactESP::app); /// task_connection_state is used to track the internal task state which might /// be out of sync with the published connection state. @@ -100,7 +100,7 @@ class WSClient : public Configurable, WebSocketsClient client_; SKDeltaQueue* sk_delta_queue_; TaskQueueProducer delta_count_producer_ = - TaskQueueProducer(0, 5, 990); + TaskQueueProducer(0, ReactESP::app, 5, 990); SemaphoreHandle_t received_updates_semaphore_ = xSemaphoreCreateRecursiveMutex(); diff --git a/src/sensesp/system/task_queue_producer.h b/src/sensesp/system/task_queue_producer.h index fa00dabc1..e9b1e1473 100644 --- a/src/sensesp/system/task_queue_producer.h +++ b/src/sensesp/system/task_queue_producer.h @@ -1,6 +1,7 @@ #ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_ #define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_ +#include "ReactESP.h" #include "observablevalue.h" namespace sensesp { @@ -14,13 +15,14 @@ namespace sensesp { * in another. * * @tparam T + * @param consumer_app The app object in which the values should be consumed. * @param queue_size Size of the queue. * @param poll_rate How often to poll the queue. Note: in microseconds! */ template class TaskQueueProducer : public ObservableValue { public: - TaskQueueProducer(const T& value, int queue_size = 1, + TaskQueueProducer(const T& value, reactesp::ReactESP* consumer_app = ReactESP::app, int queue_size = 1, unsigned int poll_rate = 990) : ObservableValue(value), queue_size_{queue_size} { queue_ = xQueueCreate(queue_size, sizeof(T)); @@ -29,7 +31,7 @@ class TaskQueueProducer : public ObservableValue { } // Create a repeat reaction that will poll the queue and emit the values - ReactESP::app->onRepeatMicros(poll_rate, [this]() { + consumer_app->onRepeatMicros(poll_rate, [this]() { T value; while (xQueueReceive(queue_, &value, 0) == pdTRUE) { this->emit(value); @@ -37,6 +39,9 @@ class TaskQueueProducer : public ObservableValue { }); } + TaskQueueProducer(const T& value, int queue_size = 1, unsigned int poll_rate = 990) + : TaskQueueProducer(value, ReactESP::app, queue_size, poll_rate) {} + bool set(const T& value) { int retval; if (queue_size_ == 1) {