Skip to content

Commit

Permalink
Merge branch 'main' into job_listener_reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
cookpate authored Oct 30, 2024
2 parents eb2f7eb + 9c80302 commit 4fa03eb
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 0 deletions.
191 changes: 191 additions & 0 deletions iotcored/src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
#define IOTCORED_NETWORK_BUFFER_SIZE 5000
#endif

#ifndef IOTCORED_UNACKED_PACKET_BUFFER_SIZE
#define IOTCORED_UNACKED_PACKET_BUFFER_SIZE (IOTCORED_NETWORK_BUFFER_SIZE * 3)
#endif

#define IOTCORED_MQTT_MAX_PUBLISH_RECORDS 10

static uint32_t time_ms(void);
Expand All @@ -53,6 +57,12 @@ struct NetworkContext {
IotcoredTlsCtx *tls_ctx;
};

typedef struct {
uint16_t packet_id;
uint8_t *serialized_packet;
size_t serialized_packet_len;
} StoredPublish;

static pthread_t recv_thread;
static pthread_t keepalive_thread;

Expand All @@ -71,6 +81,11 @@ static MQTTPubAckInfo_t
// TODO: Remove once no longer needed by coreMQTT
static MQTTPubAckInfo_t incoming_publish_record;

static StoredPublish unacked_publishes[IOTCORED_MQTT_MAX_PUBLISH_RECORDS]
= { 0 };

static uint8_t packet_store_buffer[IOTCORED_UNACKED_PACKET_BUFFER_SIZE];

pthread_mutex_t *coremqtt_get_send_mtx(const MQTTContext_t *ctx) {
(void) ctx;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
Expand All @@ -89,6 +104,177 @@ static uint32_t time_ms(void) {
return (uint32_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000);
}

// This implementation assumes that we always compact the memory when a free()
// call is made.
static uint8_t *mqtt_pub_alloc(size_t length) {
size_t i = 0;
for (; i < IOTCORED_MQTT_MAX_PUBLISH_RECORDS; i++) {
if (unacked_publishes[i].packet_id == 0) {
break;
}
}

if (i == IOTCORED_MQTT_MAX_PUBLISH_RECORDS) {
GGL_LOGE("Not enough spots in record array to store one more packet.");
return NULL;
}

uintptr_t last_packet_end;

if (i == 0) {
last_packet_end = (uintptr_t) &packet_store_buffer[0];
} else {
last_packet_end
= ((uintptr_t) unacked_publishes[i - 1].serialized_packet)
+ ((uintptr_t) unacked_publishes[i - 1].serialized_packet_len);
}

size_t bytes_filled
= (size_t) (last_packet_end - ((uintptr_t) &packet_store_buffer[0]));
size_t space_left
= (sizeof(packet_store_buffer) / sizeof(packet_store_buffer[0]))
- bytes_filled;

if (space_left < length) {
GGL_LOGE("Not enough space in buffer to store one more packet.");
return NULL;
}

return &packet_store_buffer[bytes_filled];
}

static void mqtt_pub_free(const uint8_t *ptr) {
size_t i = 0;
for (; i < IOTCORED_MQTT_MAX_PUBLISH_RECORDS; i++) {
if ((unacked_publishes[i].packet_id != 0)
&& (unacked_publishes[i].serialized_packet == ptr)) {
break;
}
}

// If we cannot find the entry. Log the error and exit.
if (i == IOTCORED_MQTT_MAX_PUBLISH_RECORDS) {
GGL_LOGE("Cannot find a matching publish record entry to free.");
return;
}

size_t byte_offset = unacked_publishes[i].serialized_packet_len;

if (i != (IOTCORED_MQTT_MAX_PUBLISH_RECORDS - 1)) {
size_t bytes_to_move
= (size_t) (((uintptr_t) &packet_store_buffer
[IOTCORED_UNACKED_PACKET_BUFFER_SIZE - 1])
- (((uintptr_t) unacked_publishes[i].serialized_packet)
+ unacked_publishes[i].serialized_packet_len)
+ 1U);

// Move the whole array after the freed packet forward in memory.
memmove(
unacked_publishes[i].serialized_packet,
(unacked_publishes[i].serialized_packet
+ unacked_publishes[i].serialized_packet_len),
bytes_to_move
);

// Compact the records.
for (; i < IOTCORED_MQTT_MAX_PUBLISH_RECORDS - 1; i++) {
if (unacked_publishes[i + 1].packet_id == 0) {
break;
}

unacked_publishes[i].packet_id = unacked_publishes[i + 1].packet_id;
unacked_publishes[i].serialized_packet
= unacked_publishes[i + 1].serialized_packet - byte_offset;
unacked_publishes[i].serialized_packet_len
= unacked_publishes[i + 1].serialized_packet_len;
}
}

// Clear the last record.
unacked_publishes[i].packet_id = 0;
unacked_publishes[i].serialized_packet = NULL;
unacked_publishes[i].serialized_packet_len = 0;

memset(
&packet_store_buffer
[(sizeof(packet_store_buffer) / sizeof(packet_store_buffer[0]))
- byte_offset],
0,
byte_offset
);
}

static bool mqtt_store_packet(
MQTTContext_t *context, uint16_t packet_id, MQTTVec_t *mqtt_vec
) {
(void) context;
size_t i;
for (i = 0; i < IOTCORED_MQTT_MAX_PUBLISH_RECORDS; i++) {
if (unacked_publishes[i].packet_id == 0) {
break;
}
}

if (i == IOTCORED_MQTT_MAX_PUBLISH_RECORDS) {
GGL_LOGE("No space left in array to store additional record.");
return false;
}

size_t memory_needed = MQTT_GetBytesInMQTTVec(mqtt_vec);

uint8_t *allocated_mem = mqtt_pub_alloc(memory_needed);
if (allocated_mem == NULL) {
return false;
}

MQTT_SerializeMQTTVec(allocated_mem, mqtt_vec);

unacked_publishes[i].packet_id = packet_id;
unacked_publishes[i].serialized_packet = allocated_mem;
unacked_publishes[i].serialized_packet_len = memory_needed;

GGL_LOGD("Stored MQTT publish (ID: %d).", packet_id);
return true;
}

static bool mqtt_retrieve_packet(
MQTTContext_t *context,
uint16_t packet_id,
uint8_t **serialized_mqtt_vec,
size_t *serialized_mqtt_vec_len
) {
(void) context;

for (size_t i = 0; i < IOTCORED_MQTT_MAX_PUBLISH_RECORDS; i++) {
if (unacked_publishes[i].packet_id == packet_id) {
*serialized_mqtt_vec = unacked_publishes[i].serialized_packet;
*serialized_mqtt_vec_len
= unacked_publishes[i].serialized_packet_len;

GGL_LOGD("Retrived MQTT publish (ID: %d).", packet_id);
return true;
}
}

GGL_LOGE("No packet with ID %d present.", packet_id);

return false;
}

static void mqtt_clear_packet(MQTTContext_t *context, uint16_t packet_id) {
(void) context;

for (size_t i = 0; i < IOTCORED_MQTT_MAX_PUBLISH_RECORDS; i++) {
if (unacked_publishes[i].packet_id == packet_id) {
mqtt_pub_free(unacked_publishes[i].serialized_packet);
GGL_LOGD("Cleared MQTT publish (ID: %d).", packet_id);
return;
}
}

GGL_LOGE("Cannot find the packet ID to clear.");
}

// Establish TLS and MQTT connection to the AWS IoT broker.
static GglError establish_connection(void *ctx) {
(void) ctx;
Expand Down Expand Up @@ -286,6 +472,11 @@ GglError iotcored_mqtt_connect(const IotcoredArgs *args) {
);
assert(mqtt_ret == MQTTSuccess);

mqtt_ret = MQTT_InitRetransmits(
&mqtt_ctx, mqtt_store_packet, mqtt_retrieve_packet, mqtt_clear_packet
);
assert(mqtt_ret == MQTTSuccess);

// Store a global variable copy.
iot_cored_args = args;

Expand Down
1 change: 1 addition & 0 deletions misc/dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ svcuids
sysv
tesd
tlog
unacked
unarchived
unrefp
unsuback
Expand Down

0 comments on commit 4fa03eb

Please sign in to comment.