Skip to content

Commit

Permalink
Fix: switch context when handling AC charger MQTT messages
Browse files Browse the repository at this point in the history
MQTT message callbacks are executed in the MQTT thread context. when
processing topics that control the huawei AC charger, we must avoid
executing methods that are not thread-safe. this change bound the
methods to be called to the respective parameters and executes them
in the TaskScheduler context, such that they no longer need to be
thread-safe.
  • Loading branch information
schlimmchen committed Dec 30, 2023
1 parent 08bc181 commit 828128a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
8 changes: 8 additions & 0 deletions include/MqttHandleHuawei.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include <Huawei_can.h>
#include <espMqttClient.h>
#include <TaskSchedulerDeclarations.h>
#include <mutex>
#include <deque>
#include <functional>

class MqttHandleHuaweiClass {
public:
Expand All @@ -19,6 +22,11 @@ class MqttHandleHuaweiClass {
uint32_t _lastPublishStats;
uint32_t _lastPublish;

// MQTT callbacks to process updates on subscribed topics are executed in
// the MQTT thread's context. we use this queue to switch processing the
// user requests into the main loop's context (TaskScheduler context).
mutable std::mutex _mqttMutex;
std::deque<std::function<void()>> _mqttCallbacks;
};

extern MqttHandleHuaweiClass MqttHandleHuawei;
37 changes: 21 additions & 16 deletions src/MqttHandleHuawei.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,21 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)

void MqttHandleHuaweiClass::loop()
{
if (!MqttSettings.getConnected() ) {
const CONFIG_T& config = Configuration.get();

std::unique_lock<std::mutex> mqttLock(_mqttMutex);

if (!config.Huawei.Enabled) {
_mqttCallbacks.clear();
return;
}

const CONFIG_T& config = Configuration.get();
for (auto& callback : _mqttCallbacks) { callback(); }
_mqttCallbacks.clear();

if (!config.Huawei.Enabled) {
mqttLock.unlock();

if (!MqttSettings.getConnected() ) {
return;
}

Expand Down Expand Up @@ -82,11 +90,6 @@ void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessagePrope
{
const CONFIG_T& config = Configuration.get();

// ignore messages if Huawei is disabled
if (!config.Huawei.Enabled) {
return;
}

char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics
strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char*

Expand All @@ -108,46 +111,48 @@ void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessagePrope
float payload_val = strtof(strlimit, NULL);
delete[] strlimit;

std::lock_guard<std::mutex> mqttLock(_mqttMutex);

if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_VOLTAGE)) {
// Set voltage limit
MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_VOLTAGE);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, &HuaweiCan, payload_val, HUAWEI_ONLINE_VOLTAGE));

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE)) {
// Set current limit
MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_VOLTAGE);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, &HuaweiCan, payload_val, HUAWEI_OFFLINE_VOLTAGE));

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_CURRENT)) {
// Set current limit
MessageOutput.printf("Limit Current: %f A\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_CURRENT);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, &HuaweiCan, payload_val, HUAWEI_ONLINE_CURRENT));

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_CURRENT)) {
// Set current limit
MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val);
HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_CURRENT);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue, &HuaweiCan, payload_val, HUAWEI_OFFLINE_CURRENT));

} else if (!strcmp(setting, TOPIC_SUB_MODE)) {
// Control power on/off
if(payload_val == 3) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control");
HuaweiCan.setMode(HUAWEI_MODE_AUTO_INT);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, &HuaweiCan, HUAWEI_MODE_AUTO_INT));
}

if(payload_val == 2) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit");
HuaweiCan.setMode(HUAWEI_MODE_AUTO_EXT);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, &HuaweiCan, HUAWEI_MODE_AUTO_EXT));
}

if(payload_val == 1) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON");
HuaweiCan.setMode(HUAWEI_MODE_ON);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, &HuaweiCan, HUAWEI_MODE_ON));
}

if(payload_val == 0) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF");
HuaweiCan.setMode(HUAWEI_MODE_OFF);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode, &HuaweiCan, HUAWEI_MODE_OFF));
}
}
}

0 comments on commit 828128a

Please sign in to comment.