Skip to content

Commit 86ff8fb

Browse files
authored
Fix message processing context init, usages, and cleanup. (Azure#696)
1 parent 370da99 commit 86ff8fb

File tree

1 file changed

+45
-5
lines changed

1 file changed

+45
-5
lines changed

src/utils/d2c_messaging/src/d2c_messaging.c

+45-5
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,19 @@ static void DefaultIoTHubSendReportedStateCompletedCallback(int http_status_code
205205
Log_Debug("context:0x%x", context);
206206
ADUC_D2C_Message_Processing_Context* message_processing_context = (ADUC_D2C_Message_Processing_Context*)context;
207207
int computed = false;
208+
209+
if (message_processing_context == NULL)
210+
{
211+
Log_Error("context is NULL");
212+
return;
213+
}
214+
215+
if (!message_processing_context->initialized)
216+
{
217+
Log_Warn("Message processing context (0x%x) is not initialized.", message_processing_context);
218+
return;
219+
}
220+
208221
pthread_mutex_lock(&message_processing_context->mutex);
209222
message_processing_context->message.lastHttpStatus = http_status_code;
210223

@@ -328,10 +341,30 @@ void ADUC_D2C_Messaging_DoWork()
328341
}
329342
}
330343

344+
/**
345+
* @brief Processes the message.
346+
* @param message_processing_context The message processing context.
347+
* @return Returns true if the message is sent.
348+
* @remark This function must be called every 100ms - 200ms to ensure that the Device to Cloud messages
349+
* are processed in timely manner but also yield the CPU to other tasks.
350+
*/
331351
static void ProcessMessage(ADUC_D2C_Message_Processing_Context* message_processing_context)
332352
{
333353
bool shouldSend = false;
334354
time_t now = GetTimeSinceEpochInSeconds();
355+
356+
if (message_processing_context == NULL)
357+
{
358+
Log_Error("context is NULL");
359+
return;
360+
}
361+
362+
if (!message_processing_context->initialized)
363+
{
364+
Log_Warn("Message processing context (0x%x) is not initialized.", message_processing_context);
365+
return;
366+
}
367+
335368
pthread_mutex_lock(&s_pendingMessageStoreMutex);
336369
pthread_mutex_lock(&message_processing_context->mutex);
337370

@@ -427,15 +460,17 @@ bool ADUC_D2C_Messaging_Init()
427460
memset(&s_pendingMessageStore, 0, sizeof(s_pendingMessageStore));
428461
for (i = 0; i < ADUC_D2C_Message_Type_Max; i++)
429462
{
430-
s_messageProcessingContext[i].type = i;
431-
s_messageProcessingContext[i].transportFunc = ADUC_D2C_Default_Message_Transport_Function;
432-
s_messageProcessingContext[i].retryStrategy = &g_defaultRetryStrategy;
433-
int res = pthread_mutex_init(&s_messageProcessingContext[i].mutex, NULL);
463+
int res = pthread_mutex_init(&s_messageProcessingContext[i].mutex, NULL);
434464
if (res != 0)
435465
{
436466
Log_Error("Can't init mutex for type %d. (err:%d)", i, res);
437467
goto done;
438468
}
469+
s_messageProcessingContext[i].type = i;
470+
s_messageProcessingContext[i].transportFunc = ADUC_D2C_Default_Message_Transport_Function;
471+
s_messageProcessingContext[i].retryStrategy = &g_defaultRetryStrategy;
472+
s_messageProcessingContext[i].initialized = true;
473+
Log_Debug("Message processing context initialized. (t:%d)", i);
439474
}
440475
s_core_initialized = true;
441476
}
@@ -456,7 +491,7 @@ void ADUC_D2C_Messaging_Uninit()
456491
if (s_core_initialized)
457492
{
458493
// Cancel pending messages
459-
for (int i = 0; i < ADUC_D2C_Message_Type_Max; i++)
494+
for (int i = 0; i < ADUC_D2C_Message_Type_Max && s_messageProcessingContext[i].initialized; i++)
460495
{
461496
pthread_mutex_lock(&s_messageProcessingContext[i].mutex);
462497
if (s_pendingMessageStore[i].content != NULL)
@@ -546,6 +581,11 @@ bool ADUC_D2C_Message_SendAsync(
546581
*/
547582
void ADUC_D2C_Messaging_Set_Transport(ADUC_D2C_Message_Type type, ADUC_D2C_MESSAGE_TRANSPORT_FUNCTION transportFunc)
548583
{
584+
if (!s_messageProcessingContext[type].initialized)
585+
{
586+
Log_Error("Message processing context (0x%x) is not initialized.", &s_messageProcessingContext[type]);
587+
return;
588+
}
549589
pthread_mutex_lock(&s_messageProcessingContext[type].mutex);
550590
s_messageProcessingContext[type].transportFunc = transportFunc;
551591
pthread_mutex_unlock(&s_messageProcessingContext[type].mutex);

0 commit comments

Comments
 (0)