Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistent storage for MQTT state #311

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,38 @@ doxygen docs/doxygen/config.doxyfile

See [CONTRIBUTING.md](./.github/CONTRIBUTING.md) for information on
contributing.

## Using Setter and Getter Functions for Persistent Storage

The coreMQTT library provides setter and getter functions to allow the application to store and restore MQTT state in persistent memory. This is useful for handling QoS2 messages after a device reboot.

### Setter Function

The `MQTT_SetOutgoingPublishRecord` function allows the application to set an outgoing publish record in the MQTT context. It can be used to restore the state of the MQTT context after a device reboot.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether a callback might be more suitable here?

Here is what i am thinking: The method described here will work perfectly fine when the reboot is planned. But what if the reboot is unplanned such as a power failure or link to the internet goes down.

In such cases, application might not have enough time to ping the library repeatedly for new packets. I understand that there might be race condition even in callback. But it is less pronounced.

When the application calls the API described above, it can either do that very frequently leading to higher CPU consumption, or it can do that slowly leading to a higher chance of a missed QoS2 publish when a reboot happens.


```c
MQTTStatus_t MQTT_SetOutgoingPublishRecord( MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTQoS_t qos,
MQTTPublishState_t publishState );
```

### Getter Function

The `MQTT_GetOutgoingPublishRecord` function allows the application to get an outgoing publish record from the MQTT context. It can be used to store the state of the MQTT context in persistent memory before a device reboot.

```c
MQTTStatus_t MQTT_GetOutgoingPublishRecord( const MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTQoS_t * pQos,
MQTTPublishState_t * pPublishState );
```

### Getting the Failed Packet ID

The `MQTT_GetFailedPacketId` function allows the application to get the packet ID of the failed packet from the MQTT context. It can be used to handle the situation when the library loses state after a device reboot.

```c
MQTTStatus_t MQTT_GetFailedPacketId( const MQTTContext_t * pMqttContext,
uint16_t * pPacketId );
```
90 changes: 90 additions & 0 deletions source/core_mqtt_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -1204,3 +1204,93 @@ const char * MQTT_State_strerror( MQTTPublishState_t state )
}

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_SetOutgoingPublishRecord( MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTQoS_t qos,
MQTTPublishState_t publishState )
{
MQTTStatus_t status = MQTTSuccess;

if( ( pMqttContext == NULL ) || ( packetId == MQTT_PACKET_ID_INVALID ) || ( qos == MQTTQoS0 ) )
{
status = MQTTBadParameter;
}
else
{
status = addRecord( pMqttContext->outgoingPublishRecords,
pMqttContext->outgoingPublishRecordMaxCount,
packetId,
qos,
publishState );
}

return status;
}

MQTTStatus_t MQTT_GetOutgoingPublishRecord( const MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTQoS_t * pQos,
MQTTPublishState_t * pPublishState )
{
MQTTStatus_t status = MQTTSuccess;
size_t recordIndex;

if( ( pMqttContext == NULL ) || ( packetId == MQTT_PACKET_ID_INVALID ) || ( pQos == NULL ) || ( pPublishState == NULL ) )
{
status = MQTTBadParameter;
}
else
{
recordIndex = findInRecord( pMqttContext->outgoingPublishRecords,
pMqttContext->outgoingPublishRecordMaxCount,
packetId,
pQos,
pPublishState );

if( recordIndex == MQTT_INVALID_STATE_COUNT )
{
status = MQTTBadParameter;
}
}

return status;
}

MQTTStatus_t MQTT_GetFailedPacketId( const MQTTContext_t * pMqttContext,
uint16_t * pPacketId )
{
MQTTStatus_t status = MQTTSuccess;
size_t recordIndex;
MQTTQoS_t qos;
MQTTPublishState_t publishState;

if( ( pMqttContext == NULL ) || ( pPacketId == NULL ) )
{
status = MQTTBadParameter;
}
else
{
for( recordIndex = 0; recordIndex < pMqttContext->outgoingPublishRecordMaxCount; recordIndex++ )
{
if( pMqttContext->outgoingPublishRecords[ recordIndex ].packetId != MQTT_PACKET_ID_INVALID )
{
qos = pMqttContext->outgoingPublishRecords[ recordIndex ].qos;
publishState = pMqttContext->outgoingPublishRecords[ recordIndex ].publishState;

if( ( qos == MQTTQoS2 ) && ( publishState == MQTTPubRelSend ) )
{
*pPacketId = pMqttContext->outgoingPublishRecords[ recordIndex ].packetId;
break;
}
}
}

if( recordIndex == pMqttContext->outgoingPublishRecordMaxCount )
{
status = MQTTBadParameter;
}
}

return status;
}
61 changes: 57 additions & 4 deletions source/include/core_mqtt_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

Expand Down Expand Up @@ -301,6 +301,59 @@ uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext,
const char * MQTT_State_strerror( MQTTPublishState_t state );
/** @endcond */

/**
* @brief Set an outgoing publish record in the MQTT context.
*
* This function allows the application to set an outgoing publish record in the
* MQTT context. It can be used to restore the state of the MQTT context after a
* device reboot.
*
* @param[in] pMqttContext Initialized MQTT context.
* @param[in] packetId ID of the PUBLISH packet.
* @param[in] qos QoS of the PUBLISH packet.
* @param[in] publishState State of the PUBLISH packet.
*
* @return #MQTTBadParameter, #MQTTNoMemory, or #MQTTSuccess.
*/
MQTTStatus_t MQTT_SetOutgoingPublishRecord( MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTQoS_t qos,
MQTTPublishState_t publishState );

/**
* @brief Get an outgoing publish record from the MQTT context.
*
* This function allows the application to get an outgoing publish record from the
* MQTT context. It can be used to store the state of the MQTT context in persistent
* memory before a device reboot.
*
* @param[in] pMqttContext Initialized MQTT context.
* @param[in] packetId ID of the PUBLISH packet.
* @param[out] pQos QoS of the PUBLISH packet.
* @param[out] pPublishState State of the PUBLISH packet.
*
* @return #MQTTBadParameter, #MQTTSuccess.
*/
MQTTStatus_t MQTT_GetOutgoingPublishRecord( const MQTTContext_t * pMqttContext,
uint16_t packetId,
MQTTQoS_t * pQos,
MQTTPublishState_t * pPublishState );

/**
* @brief Get the packet ID of the failed packet.
*
* This function allows the application to get the packet ID of the failed packet
* from the MQTT context. It can be used to handle the situation when the library
* loses state after a device reboot.
*
* @param[in] pMqttContext Initialized MQTT context.
* @param[out] pPacketId ID of the failed packet.
*
* @return #MQTTBadParameter, #MQTTSuccess.
*/
MQTTStatus_t MQTT_GetFailedPacketId( const MQTTContext_t * pMqttContext,
uint16_t * pPacketId );

/* *INDENT-OFF* */
#ifdef __cplusplus
}
Expand Down
148 changes: 144 additions & 4 deletions test/unit-test/core_mqtt_state_utest.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

Expand Down Expand Up @@ -1168,3 +1168,143 @@ void test_MQTT_State_strerror( void )
}

/* ========================================================================== */

void test_MQTT_SetOutgoingPublishRecord( void )
{
MQTTContext_t mqttContext = { 0 };
MQTTStatus_t status;
const uint16_t PACKET_ID = 1;
const MQTTQoS_t qos = MQTTQoS2;
const MQTTPublishState_t publishState = MQTTPubRelSend;
TransportInterface_t transport;
MQTTFixedBuffer_t networkBuffer = { 0 };

transport.recv = transportRecvSuccess;
transport.send = transportSendSuccess;

MQTTPubAckInfo_t incomingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
MQTTPubAckInfo_t outgoingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };

status = MQTT_Init( &mqttContext, &transport,
getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, status );

status = MQTT_InitStatefulQoS( &mqttContext,
outgoingRecords, MQTT_STATE_ARRAY_MAX_COUNT,
incomingRecords, MQTT_STATE_ARRAY_MAX_COUNT );
TEST_ASSERT_EQUAL( MQTTSuccess, status );

/* Test for bad parameters */
status = MQTT_SetOutgoingPublishRecord( NULL, PACKET_ID, qos, publishState );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
status = MQTT_SetOutgoingPublishRecord( &mqttContext, MQTT_PACKET_ID_INVALID, qos, publishState );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
status = MQTT_SetOutgoingPublishRecord( &mqttContext, PACKET_ID, MQTTQoS0, publishState );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );

/* Success. */
status = MQTT_SetOutgoingPublishRecord( &mqttContext, PACKET_ID, qos, publishState );
TEST_ASSERT_EQUAL( MQTTSuccess, status );
/* Verify the record is added correctly. */
TEST_ASSERT_EQUAL( PACKET_ID, mqttContext.outgoingPublishRecords[ 0 ].packetId );
TEST_ASSERT_EQUAL( qos, mqttContext.outgoingPublishRecords[ 0 ].qos );
TEST_ASSERT_EQUAL( publishState, mqttContext.outgoingPublishRecords[ 0 ].publishState );
}

/* ========================================================================== */

void test_MQTT_GetOutgoingPublishRecord( void )
{
MQTTContext_t mqttContext = { 0 };
MQTTStatus_t status;
const uint16_t PACKET_ID = 1;
const MQTTQoS_t qos = MQTTQoS2;
const MQTTPublishState_t publishState = MQTTPubRelSend;
MQTTQoS_t retrievedQos;
MQTTPublishState_t retrievedPublishState;
TransportInterface_t transport;
MQTTFixedBuffer_t networkBuffer = { 0 };

transport.recv = transportRecvSuccess;
transport.send = transportSendSuccess;

MQTTPubAckInfo_t incomingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
MQTTPubAckInfo_t outgoingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };

status = MQTT_Init( &mqttContext, &transport,
getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, status );

status = MQTT_InitStatefulQoS( &mqttContext,
outgoingRecords, MQTT_STATE_ARRAY_MAX_COUNT,
incomingRecords, MQTT_STATE_ARRAY_MAX_COUNT );
TEST_ASSERT_EQUAL( MQTTSuccess, status );

/* Test for bad parameters */
status = MQTT_GetOutgoingPublishRecord( NULL, PACKET_ID, &retrievedQos, &retrievedPublishState );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
status = MQTT_GetOutgoingPublishRecord( &mqttContext, MQTT_PACKET_ID_INVALID, &retrievedQos, &retrievedPublishState );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
status = MQTT_GetOutgoingPublishRecord( &mqttContext, PACKET_ID, NULL, &retrievedPublishState );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
status = MQTT_GetOutgoingPublishRecord( &mqttContext, PACKET_ID, &retrievedQos, NULL );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );

/* No record found. */
status = MQTT_GetOutgoingPublishRecord( &mqttContext, PACKET_ID, &retrievedQos, &retrievedPublishState );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );

/* Success. */
addToRecord( mqttContext.outgoingPublishRecords, 0, PACKET_ID, qos, publishState );
status = MQTT_GetOutgoingPublishRecord( &mqttContext, PACKET_ID, &retrievedQos, &retrievedPublishState );
TEST_ASSERT_EQUAL( MQTTSuccess, status );
/* Verify the record is retrieved correctly. */
TEST_ASSERT_EQUAL( qos, retrievedQos );
TEST_ASSERT_EQUAL( publishState, retrievedPublishState );
}

/* ========================================================================== */

void test_MQTT_GetFailedPacketId( void )
{
MQTTContext_t mqttContext = { 0 };
MQTTStatus_t status;
const uint16_t PACKET_ID = 1;
const MQTTQoS_t qos = MQTTQoS2;
const MQTTPublishState_t publishState = MQTTPubRelSend;
uint16_t retrievedPacketId;
TransportInterface_t transport;
MQTTFixedBuffer_t networkBuffer = { 0 };

transport.recv = transportRecvSuccess;
transport.send = transportSendSuccess;

MQTTPubAckInfo_t incomingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };
MQTTPubAckInfo_t outgoingRecords[ MQTT_STATE_ARRAY_MAX_COUNT ] = { 0 };

status = MQTT_Init( &mqttContext, &transport,
getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, status );

status = MQTT_InitStatefulQoS( &mqttContext,
outgoingRecords, MQTT_STATE_ARRAY_MAX_COUNT,
incomingRecords, MQTT_STATE_ARRAY_MAX_COUNT );
TEST_ASSERT_EQUAL( MQTTSuccess, status );

/* Test for bad parameters */
status = MQTT_GetFailedPacketId( NULL, &retrievedPacketId );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );
status = MQTT_GetFailedPacketId( &mqttContext, NULL );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );

/* No record found. */
status = MQTT_GetFailedPacketId( &mqttContext, &retrievedPacketId );
TEST_ASSERT_EQUAL( MQTTBadParameter, status );

/* Success. */
addToRecord( mqttContext.outgoingPublishRecords, 0, PACKET_ID, qos, publishState );
status = MQTT_GetFailedPacketId( &mqttContext, &retrievedPacketId );
TEST_ASSERT_EQUAL( MQTTSuccess, status );
/* Verify the packet ID is retrieved correctly. */
TEST_ASSERT_EQUAL( PACKET_ID, retrievedPacketId );
}
Loading