Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft PR to get help with MQTT OTA update issue #4146

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions wled00/fcn_declare.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ void prepareHostname(char* hostname);
bool isAsterisksOnly(const char* str, byte maxLen);
bool requestJSONBufferLock(uint8_t module=255);
void releaseJSONBufferLock();
bool requestResponseBufferLock(uint8_t module = 255);
void releaseResponseBufferLock();
uint8_t extractModeName(uint8_t mode, const char *src, char *dest, uint8_t maxLen);
uint8_t extractModeSlider(uint8_t mode, uint8_t slider, char *dest, uint8_t maxLen, uint8_t *var = nullptr);
int16_t extractModeDefaults(uint8_t mode, const char *segVar);
Expand Down
216 changes: 216 additions & 0 deletions wled00/mqtt.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "wled.h"
#include <HTTPClient.h>

/*
* MQTT communication protocol for home automation
Expand All @@ -7,6 +8,9 @@
#ifdef WLED_ENABLE_MQTT
#define MQTT_KEEP_ALIVE_TIME 60 // contact the MQTT broker every 60 seconds

// declaring this function up here
void otaUpdate(String url);

void parseMQTTBriPayload(char* payload)
{
if (strstr(payload, "ON") || strstr(payload, "on") || strstr(payload, "true")) {bri = briLast; stateUpdated(CALL_MODE_DIRECT_CHANGE);}
Expand All @@ -33,6 +37,9 @@ void onMqttConnect(bool sessionPresent)
strlcpy(subuf, mqttDeviceTopic, 33);
strcat_P(subuf, PSTR("/api"));
mqtt->subscribe(subuf, 0);
strlcpy(subuf, mqttDeviceTopic, 33);
strcat_P(subuf, PSTR("/update"));
mqtt->subscribe(subuf, 0);
}

if (mqttGroupTopic[0] != 0) {
Expand All @@ -58,6 +65,9 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties
DEBUG_PRINT(F("MQTT msg: "));
DEBUG_PRINTLN(topic);

// set connected to true. If we got a message, we must be connected (this fixes a lot of issues with AsyncMqttClient)
mqtt->setConnected(true); // note that setConnected is a function that I added to AsyncMqttClient
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a hack. Can you explain what issues you saw with the connected state?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I often received messages but wasn't able to send a response because the client said it was disconnected, even though it clearly was connected since it got the messages successfully. This fixed that issue for me. It definitely is a bit hacky though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I will veto this change (and corresponding AsyncMqttClient change) as it is inappropriate unless proved (with traces) MQTT client isn't working correctly.


// paranoia check to avoid npe if no payload
if (payload==nullptr) {
DEBUG_PRINTLN(F("no payload -> leave"));
Expand Down Expand Up @@ -117,6 +127,63 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties
handleSet(nullptr, apireq);
}
releaseJSONBufferLock();
} else if (strcmp_P(topic, PSTR("/update")) == 0)
{
// get the json buffer lock
if (!requestJSONBufferLock(15))
{
delete[] payloadStr;
payloadStr = nullptr;
return;
}

// deserialize the request
deserializeJson(doc, payloadStr);
JsonObject obj = doc.as<JsonObject>();

// make sure the request has a url
if (!obj.containsKey("url") || !obj["url"].is<String>())
{
DEBUG_PRINTLN("No url in request, won't update. Returning.");
// release the json buffer lock
releaseJSONBufferLock();
// clear out the payload string
delete[] payloadStr;
payloadStr = nullptr;
return;
}

// get the url
String url = obj["url"].as<String>();

// request the response buffer lock
if (!requestResponseBufferLock())
{
DEBUG_PRINTLN("Failed to get response buffer lock, returning.");
// release the json buffer lock
releaseJSONBufferLock();
// clear out the payload string
delete[] payloadStr;
payloadStr = nullptr;
return;
}

// make the response
mqttResponseDoc["id"] = obj["id"];
mqttResponseDoc["update"] = "Starting update, do not power off the device.";
serializeJson(mqttResponseDoc, mqttResponseBuffer);

// release the json buffer lock
releaseJSONBufferLock();

// send the response
mqtt->publish(mqttResponseTopic, 1, false, mqttResponseBuffer);

// release the response buffer lock
releaseResponseBufferLock();

// do the update
return otaUpdate(url);
} else if (strlen(topic) != 0) {
// non standard topic, check with usermods
usermods.onMqttMessage(topic, payloadStr);
Expand Down Expand Up @@ -166,6 +233,14 @@ void publishMqtt()

bool initMqtt()
{
DEBUG_PRINTLN(F("Initializing MQTT"));
// set the important variables
mqttEnabled = true;
strlcpy(mqttServer, "server address here", MQTT_MAX_SERVER_LEN + 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely these connection values are already defined as part of WLED or you wouldn't be able to use it all. No need to add a new compile time value?

mqttPort = 1883;
strlcpy(mqttUser, "username here", 41);
strlcpy(mqttPass, "password here", 65);

if (!mqttEnabled || mqttServer[0] == 0 || !WLED_CONNECTED) return false;

if (mqtt == nullptr) {
Expand Down Expand Up @@ -195,4 +270,145 @@ bool initMqtt()
mqtt->connect();
return true;
}

void otaUpdate(String url)
{
DEBUG_PRINT(F("OTA update from URL: "));
DEBUG_PRINTLN(url);

// make client for HTTP request
HTTPClient http;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this also handle HTTPS?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested that out, it might be worth trying

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Releases from GitHub are HTTPS only, so this is going to be needed for most users

http.begin(url);
http.setTimeout(3000000); // 5 minute timeout, may change

// do a get request to get the update binary
int httpCode = http.GET();
// make sure the request was successful
if (httpCode != HTTP_CODE_OK)
{
DEBUG_PRINT(F("HTTP GET failed, code: "));
DEBUG_PRINTLN(httpCode);
http.end();
return;
}

// disable the watchdog
WLED::instance().disableWatchdog();
otaInProgress = true; // I've tried both with and without this, neither seems to work

// get the size of the update
int len = http.getSize();
DEBUG_PRINT(F("Update size: "));
DEBUG_PRINTLN(len);

// make a buffer for reading
WiFiClient *stream = http.getStreamPtr();

DEBUG_PRINTLN("Got stream");

// Initialize Update
if (!Update.begin(len))
{
DEBUG_PRINTLN(F("Update.begin failed, most likely not enough space"));
http.end();
WLED::instance().enableWatchdog();
otaInProgress = false;
return;
}

DEBUG_PRINTLN("Update.begin succeeded");
DEBUG_PRINTLN("Is the stream null?");
DEBUG_PRINTLN(stream == nullptr);
DEBUG_PRINT(F("Free Heap: "));
DEBUG_PRINTLN(ESP.getFreeHeap());

// write the update to the device
size_t written = 0;
int bufferSize = 512;
uint8_t buffer[bufferSize];
// size_t written = Update.writeStream(*stream);
while (http.connected() && written < len)
{
if (stream->available())
{
int bytesRead = stream->readBytes(buffer, bufferSize);
if (bytesRead == 0)
{
DEBUG_PRINTLN("No bytes read");
}
written += Update.write(buffer, bytesRead);
DEBUG_PRINT("Bytes written: ");
DEBUG_PRINTLN(written);
if (ESP.getFreeHeap() < 80000)
{
DEBUG_PRINT(F("Free Heap below 80000: "));
DEBUG_PRINTLN(ESP.getFreeHeap());
}
if (http.connected() != 1)
{
DEBUG_PRINT("http Connection status: ");
DEBUG_PRINTLN(http.connected());
}
if (WiFi.status() != 3)
{
DEBUG_PRINT("Wifi status: ");
DEBUG_PRINTLN(WiFi.status());
}
}
else
{
DEBUG_PRINTLN("No bytes available");
}
delay(10);
}

DEBUG_PRINTLN("Wrote stream");

// check if the update was successful
if (written == len)
{
DEBUG_PRINTLN(F("Written to flash successfully"));
}
else
{
DEBUG_PRINT(F("Update failed, only wrote : "));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most users will not be running a debug build, so I think that at least the key messages should also be sent as m via MQTT.

It looks like you started doing this and then just swapped to debug only

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I wanted to avoid sending mqtt messages at any point in this process until I got it working first just to make sure that isn't causing any issues.

DEBUG_PRINT(written);
DEBUG_PRINTLN(F(" bytes"));
http.end();
WLED::instance().enableWatchdog();
otaInProgress = false;
return;
}

// End the update process
if (Update.end())
{
if (Update.isFinished())
{
DEBUG_PRINTLN(F("Update finished successfully, restarting now"));
http.end();
delay(1000);
ESP.restart();
}
else
{
DEBUG_PRINTLN(F("Update not finished, something went wrong!"));
}
}
else
{
DEBUG_PRINT(F("OTA Error Occurred. Error: "));
DEBUG_PRINT(Update.errorString());
DEBUG_PRINT(" Code: ");
DEBUG_PRINTLN(Update.getError());
}

// reenable the watchdog
WLED::instance().enableWatchdog();
// end the http request
http.end();
// set ota in progress to false
otaInProgress = false;
}

#endif
5 changes: 5 additions & 0 deletions wled00/src/dependencies/async-mqtt-client/AsyncMqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,3 +875,8 @@ uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, c
return 1;
}
}

void AsyncMqttClient::setConnected(bool connected)
{
_connected = connected;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class AsyncMqttClient {
uint16_t subscribe(const char* topic, uint8_t qos);
uint16_t unsubscribe(const char* topic);
uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0);
void setConnected(bool connected);

private:
AsyncClient _client;
Expand Down
33 changes: 33 additions & 0 deletions wled00/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,39 @@ void releaseJSONBufferLock()
jsonBufferLock = 0;
}

// the same functions but for my new response buffers
bool requestResponseBufferLock(uint8_t module)
{
unsigned long now = millis();

while (responseBufferLock && millis() - now < 1000)
delay(1); // wait for a second for buffer lock

if (millis() - now >= 1000)
{
DEBUG_PRINT(F("ERROR: Locking response buffer failed! ("));
DEBUG_PRINT(responseBufferLock);
DEBUG_PRINTLN(")");
return false; // waiting time-outed
}

responseBufferLock = module ? module : 255;
DEBUG_PRINT(F("Response buffer locked. ("));
DEBUG_PRINT(responseBufferLock);
DEBUG_PRINTLN(")");
mqttResponseDoc.clear(); // Clear the JSON document
memset(mqttResponseBuffer, 0, JSON_BUFFER_SIZE); // Clear the buffer
return true;
}

void releaseResponseBufferLock()
{
DEBUG_PRINT(F("Response buffer released. ("));
DEBUG_PRINT(responseBufferLock);
DEBUG_PRINTLN(")");
responseBufferLock = 0;
}


// extracts effect mode (or palette) name from names serialized string
// caller must provide large enough buffer for name (including SR extensions)!
Expand Down
11 changes: 9 additions & 2 deletions wled00/wled.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ void WLED::reset()

void WLED::loop()
{
if (otaInProgress)
{
// stop the loop while ota in progress
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect it's this change that is causing you the issues with the MQTT clients connection state.

This feels too heavy handed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the connection state issue, I had those issues well before I tried adding this.

}
#ifdef WLED_DEBUG
static unsigned long lastRun = 0;
unsigned long loopMillis = millis();
Expand Down Expand Up @@ -461,8 +466,10 @@ pinManager.allocateMultiplePins(pins, sizeof(pins)/sizeof(managed_pin_type), Pin
// fill in unique mdns default
if (strcmp(cmDNS, "x") == 0) sprintf_P(cmDNS, PSTR("wled-%*s"), 6, escapedMac.c_str() + 6);
#ifndef WLED_DISABLE_MQTT
if (mqttDeviceTopic[0] == 0) sprintf_P(mqttDeviceTopic, PSTR("wled/%*s"), 6, escapedMac.c_str() + 6);
if (mqttClientID[0] == 0) sprintf_P(mqttClientID, PSTR("WLED-%*s"), 6, escapedMac.c_str() + 6);
if (mqttDeviceTopic[0] == 0) sprintf_P(mqttDeviceTopic, PSTR("lights/%*s"), 6, escapedMac.c_str() + 6);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like changes from your own closed source code leaking in here. Revert

if (mqttClientID[0] == 0) sprintf_P(mqttClientID, PSTR("LIGHTS-%*s"), 6, escapedMac.c_str() + 6);
if (mqttResponseTopic[0] == 0)
snprintf_P(mqttResponseTopic, 36, PSTR("%s/r"), mqttDeviceTopic);
#endif

#ifdef WLED_ENABLE_ADALIGHT
Expand Down
11 changes: 9 additions & 2 deletions wled00/wled.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ WLED_GLOBAL char versionString[] _INIT(TOSTRING(WLED_VERSION));
// AP and OTA default passwords (for maximum security change them!)
WLED_GLOBAL char apPass[65] _INIT(WLED_AP_PASS);
WLED_GLOBAL char otaPass[33] _INIT(DEFAULT_OTA_PASS);
WLED_GLOBAL bool otaInProgress _INIT(false);

// Hardware and pin config
#ifndef BTNPIN
Expand Down Expand Up @@ -432,12 +433,13 @@ WLED_GLOBAL unsigned long lastMqttReconnectAttempt _INIT(0); // used for other
#define MQTT_MAX_TOPIC_LEN 32
#endif
#ifndef MQTT_MAX_SERVER_LEN
#define MQTT_MAX_SERVER_LEN 32
#define MQTT_MAX_SERVER_LEN 52
#endif
WLED_GLOBAL AsyncMqttClient *mqtt _INIT(NULL);
WLED_GLOBAL bool mqttEnabled _INIT(false);
WLED_GLOBAL char mqttStatusTopic[40] _INIT(""); // this must be global because of async handlers
WLED_GLOBAL char mqttDeviceTopic[MQTT_MAX_TOPIC_LEN+1] _INIT(""); // main MQTT topic (individual per device, default is wled/mac)
WLED_GLOBAL char mqttDeviceTopic[MQTT_MAX_TOPIC_LEN+1] _INIT(""); // main MQTT topic (individual per device, default is lights/mac)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated changes

WLED_GLOBAL char mqttResponseTopic[MQTT_MAX_TOPIC_LEN + 1] _INIT(""); // MQTT response topic (individual per device, default is lights/mac/r)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best practice for service-like calls in a pub-sub framework is to accept the response topic as part of the request. Apart from removing any possibility of a misconfigured server, it allows the client to specify a unique ephemeral topic for each transaction, ensuring that you don't ever accidentally receive a reply from some other client's service call.

WLED_GLOBAL char mqttGroupTopic[MQTT_MAX_TOPIC_LEN+1] _INIT("wled/all"); // second MQTT topic (for example to group devices)
WLED_GLOBAL char mqttServer[MQTT_MAX_SERVER_LEN+1] _INIT(""); // both domains and IPs should work (no SSL)
WLED_GLOBAL char mqttUser[41] _INIT(""); // optional: username for MQTT auth
Expand Down Expand Up @@ -757,6 +759,11 @@ WLED_GLOBAL int8_t spi_sclk _INIT(SPISCLKPIN);
WLED_GLOBAL StaticJsonDocument<JSON_BUFFER_SIZE> doc;
WLED_GLOBAL volatile uint8_t jsonBufferLock _INIT(0);

// global buffers for mqtt responses
WLED_GLOBAL StaticJsonDocument<JSON_BUFFER_SIZE> mqttResponseDoc;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really the first time we are sending a response via MQTT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MQTT can receive JSON but is not responding in JSON. I would still re-use existing global buffer if possible.

This is a heavy toll on available RAM.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concur. There's no need for a second buffer -- the JSON response is being generated after the incoming message is processed, so the buffer can be cleared and re-used without releasing the lock.

WLED_GLOBAL char mqttResponseBuffer[JSON_BUFFER_SIZE] _INIT("");
WLED_GLOBAL volatile uint8_t responseBufferLock _INIT(0);

// enable additional debug output
#if defined(WLED_DEBUG_HOST)
#include "net_debug.h"
Expand Down
Loading