Skip to content

Commit

Permalink
ADD MQTT retain feature
Browse files Browse the repository at this point in the history
  • Loading branch information
fgalan committed Aug 30, 2023
1 parent 2f74717 commit 71859c3
Show file tree
Hide file tree
Showing 17 changed files with 1,446 additions and 12 deletions.
3 changes: 2 additions & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
- Add: servicePath field to builtin attributes (#2877)
- Add: notification.mqtt.retain and notification.mqttCustom.retain flag for MQTT retain in notifications (#4388)
- Fix: logDeprecate not working correctly (`geo:json` wrongly considered as deprecated)
- Fix: improve error traces (#4387)
- Add: CLI parameter -dbUri / env var ORION_MONGO_URI (#3794)
- Fix: improve logs in MongoDB query logic
- Fix: improve logs in MongoDB query logic
5 changes: 3 additions & 2 deletions doc/manuals/admin/database_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,9 @@ Fields:
is updated each time a notification is sent, to avoid violating throttling.
- **throttling**: minimum interval between notifications. 0 or -1 means no throttling.
- **reference**: the URL for notifications, either HTTP or MQTT
- **mqttTopic**: MQTT topic (only in MQTT notifications)
- **mqttQoS**: MQTT QoS value (only in MQTT notifications)
- **topic**: MQTT topic (only in MQTT notifications)
- **qos**: MQTT QoS value (only in MQTT notifications)
- **retain**: MQTT retain value (only in MQTT notifications)
- **entities**: an array of entities (mandatory). The JSON for each
entity contains **id**, **type**, **isPattern** and **isTypePattern**. Note that,
due to legacy reasons, **isPattern** may be `"true"` or `"false"` (text) while
Expand Down
2 changes: 2 additions & 0 deletions doc/manuals/orion-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3713,6 +3713,7 @@ A `mqtt` object contains the following subfields:
| `url` | | string | Represent the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains a path (it only includes host and port) |
| `topic` | | string | Represent the MQTT topic to use |
| `qos` || number | MQTT QoS value to use in the notifications associated to the subscription (0, 1 or 2). If omitted then QoS 0 is used. |
| `retain` || boolean | MQTT retain value to use in the notifications associated to the subscription (`true` or `false`). If omitted then retain `false` is used. |
| `user` || string | User name used to authenticate the connection with the broker. |
| `passwd` || string | Passphrase for the broker authentication. It is always obfuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`). |

Expand Down Expand Up @@ -3746,6 +3747,7 @@ A `mqttCustom` object contains the following subfields.
| `url` | | string | Represent the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains a path (it only includes host and port) |
| `topic` | | string | Represent the MQTT topic to use. Macro replacement is also performed for this field (i.e: a topic based on an attribute ) |
| `qos` || number | MQTT QoS value to use in the notifications associated to the subscription (0, 1 or 2). If omitted then QoS 0 is used. |
| `retain` || boolean | MQTT retain value to use in the notifications associated to the subscription (`true` or `false`). If omitted then retain `false` is used. |
| `user` || string | User name used to authenticate the connection with the broker. |
| `passwd` || string | Passphrase for the broker authentication. It is always obfuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`). |
| `payload` || string | Text-based payload to be used in notifications. In case of empty string or omitted, the default payload (see [Notification Messages](#notification-messages) sections) is used. If `null`, notification will not include any payload. |
Expand Down
4 changes: 3 additions & 1 deletion doc/manuals/user/mqtt_notifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ The following elements can be used within `mqtt`:
* `topic` to specify the MQTT topic to use
* `qos`: to specify the MQTT QoS value to use in the notifications associated to the subscription
(0, 1 or 2). This is an optional field, if omitted then QoS 0 is used.
* `retain`: to specify the MQTT retain value to use in the notifications associated to the subscription
(`true` or `false`). This is an optional field, if omitted then QoS `false` is used.
* `user` and `passwd`: optional fields, to be used in the case MQTT broker needs user/password based
authentication. If used, both fields have to be used together. Note that for security reasons,
the password is always offuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`).
Expand All @@ -52,7 +54,7 @@ in MQTT subscriptions work the same as in HTTP subscriptions, taking into accoun
* `mqttCustom` is used instead of `httpCustom`
* The same fields used in `mqtt` can be used in `mqttCustom`.
* `headers`, `qs` and `method`cannot be used, as they doesn’t have equivalence in MQTT
* Macro replacement is performed in `topic` and `payload` fields. `url`, `qos`, `user` and `passwd` are fixed values
* Macro replacement is performed in `topic` and `payload` fields. `url`, `qos`, `retain`, `user` and `passwd` are fixed values

## Connection management

Expand Down
3 changes: 3 additions & 0 deletions src/lib/apiTypesV2/MqttInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ std::string MqttInfo::toJson()
jh.addString("url", this->url);
jh.addString("topic", this->topic);
jh.addNumber("qos", (long long) this->qos);
jh.addBool("retain", this->retain);

if (providedAuth)
{
Expand Down Expand Up @@ -117,6 +118,7 @@ void MqttInfo::fill(const orion::BSONObj& bo)
this->url = bo.hasField(CSUB_REFERENCE)? getStringFieldF(bo, CSUB_REFERENCE) : "";
this->topic = bo.hasField(CSUB_MQTTTOPIC)? getStringFieldF(bo, CSUB_MQTTTOPIC) : "";
this->qos = bo.hasField(CSUB_MQTTQOS)? getIntFieldF(bo, CSUB_MQTTQOS) : 0;
this->retain = bo.hasField(CSUB_MQTTRETAIN)? getBoolFieldF(bo, CSUB_MQTTRETAIN) : false;
this->custom = bo.hasField(CSUB_CUSTOM)? getBoolFieldF(bo, CSUB_CUSTOM) : false;

// both user and passwd have to be used at the same time
Expand Down Expand Up @@ -228,6 +230,7 @@ void MqttInfo::fill(const MqttInfo& _mqttInfo)
this->url = _mqttInfo.url;
this->topic = _mqttInfo.topic;
this->qos = _mqttInfo.qos;
this->retain = _mqttInfo.retain;
this->custom = _mqttInfo.custom;
this->payload = _mqttInfo.payload;
this->payloadType = _mqttInfo.payloadType;
Expand Down
1 change: 1 addition & 0 deletions src/lib/apiTypesV2/MqttInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct MqttInfo
std::string url;
std::string topic;
unsigned int qos; // 0, 1 or 2
bool retain;

bool custom;
std::string payload; // either payload, json or ngsi is used (depending on payloadType)
Expand Down
39 changes: 39 additions & 0 deletions src/lib/jsonParseV2/parseSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,31 @@ static std::string parseMqttQoS(ConnectionInfo* ciP, SubscriptionUpdate* subsP,



/* ****************************************************************************
*
* parseMqttRetain -
*/
static std::string parseMqttRetain(ConnectionInfo* ciP, SubscriptionUpdate* subsP, const Value& mqtt)
{
Opt<bool> retainOpt = getBoolOpt(mqtt, "retain");
if (!retainOpt.ok())
{
return badInput(ciP, retainOpt.error);
}
if (retainOpt.given)
{
subsP->notification.mqttInfo.retain = retainOpt.value;
}
else
{
subsP->notification.mqttInfo.retain = 0;
}

return "";
}



/* ****************************************************************************
*
* parseMqttTopic -
Expand Down Expand Up @@ -1035,6 +1060,13 @@ static std::string parseNotification(ConnectionInfo* ciP, SubscriptionUpdate* su
return r;
}

// retain
r = parseMqttRetain(ciP, subsP, mqtt);
if (!r.empty())
{
return r;
}

// topic
r = parseMqttTopic(ciP, subsP, mqtt);
if (!r.empty())
Expand Down Expand Up @@ -1076,6 +1108,13 @@ static std::string parseNotification(ConnectionInfo* ciP, SubscriptionUpdate* su
return r;
}

// retain
r = parseMqttRetain(ciP, subsP, mqttCustom);
if (!r.empty())
{
return r;
}

// topic (same as in not custom mqtt)
r = parseMqttTopic(ciP, subsP, mqttCustom);
if (!r.empty())
Expand Down
10 changes: 6 additions & 4 deletions src/lib/mongoBackend/MongoCommonSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,14 @@ void setNotificationInfo(const Subscription& sub, orion::BSONObjBuilder* b)
b->append(CSUB_REFERENCE, sub.notification.mqttInfo.url);
b->append(CSUB_MQTTTOPIC, sub.notification.mqttInfo.topic);
b->append(CSUB_MQTTQOS, (int) sub.notification.mqttInfo.qos);
b->append(CSUB_MQTTRETAIN, sub.notification.mqttInfo.retain);
b->append(CSUB_CUSTOM, sub.notification.mqttInfo.custom);

LM_T(LmtMongo, ("Subscription reference: %s", sub.notification.mqttInfo.url.c_str()));
LM_T(LmtMongo, ("Subscription mqttTopic: %s", sub.notification.mqttInfo.topic.c_str()));
LM_T(LmtMongo, ("Subscription mqttQos: %d", sub.notification.mqttInfo.qos));
LM_T(LmtMongo, ("Subscription custom: %s", sub.notification.mqttInfo.custom? "true" : "false"));
LM_T(LmtMongo, ("Subscription reference: %s", sub.notification.mqttInfo.url.c_str()));
LM_T(LmtMongo, ("Subscription mqttTopic: %s", sub.notification.mqttInfo.topic.c_str()));
LM_T(LmtMongo, ("Subscription mqttQos: %d", sub.notification.mqttInfo.qos));
LM_T(LmtMongo, ("Subscription mqttRetain: %s", sub.notification.mqttInfo.retain? "true": "false"));
LM_T(LmtMongo, ("Subscription custom: %s", sub.notification.mqttInfo.custom? "true" : "false"));

if (sub.notification.mqttInfo.providedAuth)
{
Expand Down
1 change: 1 addition & 0 deletions src/lib/mongoBackend/dbConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@

#define CSUB_MQTTTOPIC "topic"
#define CSUB_MQTTQOS "qos"
#define CSUB_MQTTRETAIN "retain"

#define CSUB_USER "user"
#define CSUB_PASSWD "passwd"
Expand Down
1 change: 1 addition & 0 deletions src/lib/mongoBackend/mongoUpdateSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void setNotificationInfo(const Subscription& sub, orion::BSONObjBuilder* setB, o
{
unsetB->append(CSUB_MQTTTOPIC, 1);
unsetB->append(CSUB_MQTTQOS, 1);
unsetB->append(CSUB_MQTTRETAIN, 1);
unsetB->append(CSUB_USER, 1);
unsetB->append(CSUB_PASSWD, 1);

Expand Down
4 changes: 2 additions & 2 deletions src/lib/mqtt/MqttConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ MqttConnection* MqttConnectionManager::getConnection(const std::string& host, in
*
* MqttConnectionManager::sendMqttNotification -
*/
bool MqttConnectionManager::sendMqttNotification(const std::string& host, int port, const std::string& user, const std::string& passwd, const std::string& content, const std::string& topic, unsigned int qos)
bool MqttConnectionManager::sendMqttNotification(const std::string& host, int port, const std::string& user, const std::string& passwd, const std::string& content, const std::string& topic, unsigned int qos, bool retain)
{
std::string endpoint = getEndpoint(host, port);

Expand Down Expand Up @@ -394,7 +394,7 @@ bool MqttConnectionManager::sendMqttNotification(const std::string& host, int po
int id;

bool retval;
int resultCode = mosquitto_publish(mosq, &id, topic.c_str(), (int) strlen(msg), msg, qos, false);
int resultCode = mosquitto_publish(mosq, &id, topic.c_str(), (int) strlen(msg), msg, qos, retain);
if (resultCode != MOSQ_ERR_SUCCESS)
{
retval = false;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/mqtt/MqttConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MqttConnectionManager

const char* semGet(void);

bool sendMqttNotification(const std::string& host, int port, const std::string& user, const std::string& passwd, const std::string& content, const std::string& topic, unsigned int qos);
bool sendMqttNotification(const std::string& host, int port, const std::string& user, const std::string& passwd, const std::string& content, const std::string& topic, unsigned int qos, bool retain);
void cleanup(double maxAge);

private:
Expand Down
2 changes: 2 additions & 0 deletions src/lib/ngsiNotify/Notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ static SenderThreadParams* buildSenderParamsCustom
paramsP->registration = false;
paramsP->subscriptionId = subscriptionId.get();
paramsP->qos = notification.mqttInfo.qos; // unspecified in case of HTTP notifications
paramsP->retain = notification.mqttInfo.retain; // unspecified in case of HTTP notifications
paramsP->timeout = notification.httpInfo.timeout; // unspecified in case of MQTT notifications
paramsP->user = notification.mqttInfo.user; // unspecified in case of HTTP notifications
paramsP->passwd = notification.mqttInfo.passwd; // unspecified in case of HTTP notifications
Expand Down Expand Up @@ -714,6 +715,7 @@ SenderThreadParams* Notifier::buildSenderParams
paramsP->subscriptionId = ncr.subscriptionId.get();
paramsP->registration = false;
paramsP->qos = notification.mqttInfo.qos; // unspecified in case of HTTP notifications
paramsP->retain = notification.mqttInfo.retain; // unspecified in case of HTTP notifications
paramsP->timeout = notification.httpInfo.timeout; // unspecified in case of MQTT notifications
paramsP->user = notification.mqttInfo.user; // unspecified in case of HTTP notifications
paramsP->passwd = notification.mqttInfo.passwd; // unspecified in case of HTTP notifications
Expand Down
2 changes: 1 addition & 1 deletion src/lib/ngsiNotify/doNotify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ static void doNotifyMqtt(SenderThreadParams* params)

// Note that we use in subNotificationErrorStatus() statusCode -1 and failureReson "" to avoid using
// lastFailureReason and lastSuccessCode in MQTT notifications (they don't have sense in this case)
if (mqttMgr.sendMqttNotification(params->ip, params->port, params->user, params->passwd, params->content, params->resource, params->qos))
if (mqttMgr.sendMqttNotification(params->ip, params->port, params->user, params->passwd, params->content, params->resource, params->qos, params->retain))
{
// MQTT transaction is logged only in the case it was actually published. Upon successful publishing
// mqttOnPublishCallback is called (by the moment we are not doing nothing there, just printing in
Expand Down
1 change: 1 addition & 0 deletions src/lib/ngsiNotify/senderThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ typedef struct SenderThreadParams
std::string xauthToken;
std::string resource; // path for HTTP notifications, topic for MQTT notifications
unsigned int qos; // used only in MQTT notifications
unsigned int retain; // used only in MQTT notifications
std::string user; // for user/pass auth connections (only MQTT at the present moment)
std::string passwd; // for user/pass auth connections (only MQTT at the present moment)
std::string content_type;
Expand Down
Loading

0 comments on commit 71859c3

Please sign in to comment.