Skip to content

Commit

Permalink
Merge pull request #627 from SignalK/task_queue_producer_return_val
Browse files Browse the repository at this point in the history
TaskQueueProducer changes
  • Loading branch information
mairas authored Sep 29, 2022
2 parents 27dba0d + a6891e4 commit 7e538f5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/sensesp/net/ws_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class WSClient : public Configurable,
bool token_test_success_ = false;

TaskQueueProducer<WSConnectionState> connection_state_ =
TaskQueueProducer<WSConnectionState>(WSConnectionState::kWSDisconnected);
TaskQueueProducer<WSConnectionState>(WSConnectionState::kWSDisconnected, ReactESP::app);

/// task_connection_state is used to track the internal task state which might
/// be out of sync with the published connection state.
Expand All @@ -100,7 +100,7 @@ class WSClient : public Configurable,
WebSocketsClient client_;
SKDeltaQueue* sk_delta_queue_;
TaskQueueProducer<int> delta_count_producer_ =
TaskQueueProducer<int>(0, 5, 990);
TaskQueueProducer<int>(0, ReactESP::app, 5, 990);

SemaphoreHandle_t received_updates_semaphore_ =
xSemaphoreCreateRecursiveMutex();
Expand Down
20 changes: 15 additions & 5 deletions src/sensesp/system/task_queue_producer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
#define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_

#include "ReactESP.h"
#include "observablevalue.h"

namespace sensesp {
Expand All @@ -14,13 +15,14 @@ namespace sensesp {
* in another.
*
* @tparam T
* @param consumer_app The app object in which the values should be consumed.
* @param queue_size Size of the queue.
* @param poll_rate How often to poll the queue. Note: in microseconds!
*/
template <class T>
class TaskQueueProducer : public ObservableValue<T> {
public:
TaskQueueProducer(const T& value, int queue_size = 1,
TaskQueueProducer(const T& value, reactesp::ReactESP* consumer_app = ReactESP::app, int queue_size = 1,
unsigned int poll_rate = 990)
: ObservableValue<T>(value), queue_size_{queue_size} {
queue_ = xQueueCreate(queue_size, sizeof(T));
Expand All @@ -29,20 +31,28 @@ class TaskQueueProducer : public ObservableValue<T> {
}

// Create a repeat reaction that will poll the queue and emit the values
ReactESP::app->onRepeatMicros(poll_rate, [this]() {
consumer_app->onRepeatMicros(poll_rate, [this]() {
T value;
while (xQueueReceive(queue_, &value, 0) == pdTRUE) {
this->emit(value);
}
});
}

void set(const T& value) {
TaskQueueProducer(const T& value, int queue_size = 1, unsigned int poll_rate = 990)
: TaskQueueProducer(value, ReactESP::app, queue_size, poll_rate) {}

bool set(const T& value) {
int retval;
if (queue_size_ == 1) {
xQueueOverwrite(queue_, &value);
retval = xQueueOverwrite(queue_, &value);
} else {
xQueueSend(queue_, &value, 0);
retval = xQueueSend(queue_, &value, 0);
}
if (retval != pdTRUE) {
return false;
}
return true;
}

private:
Expand Down

0 comments on commit 7e538f5

Please sign in to comment.