-
Notifications
You must be signed in to change notification settings - Fork 4
/
iothub_client_sample_mqtt.c
207 lines (179 loc) · 8.13 KB
/
iothub_client_sample_mqtt.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
#include <stdio.h>
#include <stdlib.h>
#include "iothub_client.h"
#include "iothub_message.h"
#include "azure_c_shared_utility/threadapi.h"
#include "azure_c_shared_utility/crt_abstractions.h"
#include "azure_c_shared_utility/platform.h"
#include "iothubtransportmqtt.h"
#include "azureiotcerts.h"
/*String containing Hostname, Device Id & Device Key in the format: */
/* "HostName=<host_name>;DeviceId=<device_id>;SharedAccessKey=<device_key>" */
/* "HostName=<host_name>;DeviceId=<device_id>;SharedAccessSignature=<device_sas_token>" */
static const char* connectionString = "HostName=AZ3166test01.azure-devices.cn;DeviceId=AZ3166TEST;SharedAccessKey=SQQOxf2rFCmVwlyXJh0ZEqxLb/1idodQNnY+ZKLl2Vs=";
static int callbackCounter;
static char msgText[1024];
static char propText[1024];
static bool g_continueRunning;
#define MESSAGE_COUNT 3
#define DOWORK_LOOP_NUM 3
extern float hts221_humidity;
extern float hts221_temp;
extern int32_t p_data[3];
extern int32_t x_axes[3];
extern int32_t g_axes[3];
typedef struct EVENT_INSTANCE_TAG
{
IOTHUB_MESSAGE_HANDLE messageHandle;
size_t messageTrackingId; // For tracking the messages within the user callback.
} EVENT_INSTANCE;
static IOTHUBMESSAGE_DISPOSITION_RESULT ReceiveMessageCallback(IOTHUB_MESSAGE_HANDLE message, void* userContextCallback)
{
int* counter = (int*)userContextCallback;
const char* buffer;
size_t size;
MAP_HANDLE mapProperties;
const char* messageId;
const char* correlationId;
// Message properties
if ((messageId = IoTHubMessage_GetMessageId(message)) == NULL)
{
messageId = "<null>";
}
if ((correlationId = IoTHubMessage_GetCorrelationId(message)) == NULL)
{
correlationId = "<null>";
}
// Message content
if (IoTHubMessage_GetByteArray(message, (const unsigned char**)&buffer, &size) != IOTHUB_MESSAGE_OK)
{
(void)printf("unable to retrieve the message data\r\n");
}
else
{
(void)printf("Received Message [%d]\r\n Message ID: %s\r\n Correlation ID: %s\r\n Data: <<<%.*s>>> & Size=%d\r\n", *counter, messageId, correlationId, (int)size, buffer, (int)size);
// If we receive the work 'quit' then we stop running
if (size == (strlen("quit") * sizeof(char)) && memcmp(buffer, "quit", size) == 0)
{
g_continueRunning = false;
}
}
// Retrieve properties from the message
mapProperties = IoTHubMessage_Properties(message);
if (mapProperties != NULL)
{
const char*const* keys;
const char*const* values;
size_t propertyCount = 0;
if (Map_GetInternals(mapProperties, &keys, &values, &propertyCount) == MAP_OK)
{
if (propertyCount > 0)
{
size_t index;
printf(" Message Properties:\r\n");
for (index = 0; index < propertyCount; index++)
{
(void)printf("\tKey: %s Value: %s\r\n", keys[index], values[index]);
}
(void)printf("\r\n");
}
}
}
/* Some device specific action code goes here... */
(*counter)++;
return IOTHUBMESSAGE_ACCEPTED;
}
static void SendConfirmationCallback(IOTHUB_CLIENT_CONFIRMATION_RESULT result, void* userContextCallback)
{
EVENT_INSTANCE* eventInstance = (EVENT_INSTANCE*)userContextCallback;
(void)printf("Confirmation[%d] received for message tracking id = %zu with result = %s\r\n", callbackCounter, eventInstance->messageTrackingId, ENUM_TO_STRING(IOTHUB_CLIENT_CONFIRMATION_RESULT, result));
/* Some device specific action code goes here... */
callbackCounter++;
IoTHubMessage_Destroy(eventInstance->messageHandle);
}
void iothub_client_sample_mqtt_run(void)
{
IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle;
EVENT_INSTANCE messages[MESSAGE_COUNT];
g_continueRunning = true;
srand((unsigned int)time(NULL));
double avgWindSpeed = 10.0;
callbackCounter = 0;
int receiveContext = 0;
if (platform_init() != 0)
{
(void)printf("Failed to initialize the platform.\r\n");
}
else
{
if ((iotHubClientHandle = IoTHubClient_LL_CreateFromConnectionString(connectionString, MQTT_Protocol)) == NULL)
{
(void)printf("ERROR: iotHubClientHandle is NULL!\r\n");
}
else
{
bool traceOn = true;
IoTHubClient_LL_SetOption(iotHubClientHandle, "logtrace", &traceOn);
// For mbed add the certificate information
if (IoTHubClient_LL_SetOption(iotHubClientHandle, "TrustedCerts", certificates) != IOTHUB_CLIENT_OK)
{
printf("failure to set option \"TrustedCerts\"\r\n");
}
/* Setting Message call back, so we can receive Commands. */
if (IoTHubClient_LL_SetMessageCallback(iotHubClientHandle, ReceiveMessageCallback, &receiveContext) != IOTHUB_CLIENT_OK)
{
(void)printf("ERROR: IoTHubClient_LL_SetMessageCallback..........FAILED!\r\n");
}
else
{
(void)printf("IoTHubClient_LL_SetMessageCallback...successful.\r\n");
/* Now that we are ready to receive commands, let's send some messages */
size_t iterator = 0;
do
{
if (iterator < MESSAGE_COUNT)
{
// sprintf_s(msgText, sizeof(msgText), "{\"deviceId\":\"myFirstDevice\",\"windSpeed\":%.2f}", avgWindSpeed + (rand() % 4 + 2));
sprintf_s(msgText, sizeof(msgText), "{\"deviceId\":\"myFirstDevice\",\"Humidity\":%.2f,\"Temp\":%.2f,\"magnet\":%d,%d,%d,\"acceleration\":%d,%d,%d,\"gra\":%d,%d,%d}",hts221_humidity,hts221_temp,p_data[0], p_data[1], p_data[2],x_axes[0],x_axes[1],x_axes[2],g_axes[0],g_axes[1],g_axes[2]);
if ((messages[iterator].messageHandle = IoTHubMessage_CreateFromByteArray((const unsigned char*)msgText, strlen(msgText))) == NULL)
{
(void)printf("ERROR: iotHubMessageHandle is NULL!\r\n");
}
else
{
messages[iterator].messageTrackingId = iterator;
MAP_HANDLE propMap = IoTHubMessage_Properties(messages[iterator].messageHandle);
(void)sprintf_s(propText, sizeof(propText), "PropMsg_%zu", iterator);
if (Map_AddOrUpdate(propMap, "PropName", propText) != MAP_OK)
{
(void)printf("ERROR: Map_AddOrUpdate Failed!\r\n");
}
if (IoTHubClient_LL_SendEventAsync(iotHubClientHandle, messages[iterator].messageHandle, SendConfirmationCallback, &messages[iterator]) != IOTHUB_CLIENT_OK)
{
(void)printf("ERROR: IoTHubClient_LL_SendEventAsync..........FAILED!\r\n");
}
else
{
(void)printf("IoTHubClient_LL_SendEventAsync accepted message [%d] for transmission to IoT Hub.\r\n", (int)iterator);
}
}
}
IoTHubClient_LL_DoWork(iotHubClientHandle);
ThreadAPI_Sleep(1);
iterator++;
} while (g_continueRunning);
(void)printf("iothub_client_sample_mqtt has gotten quit message, call DoWork %d more time to complete final sending...\r\n", DOWORK_LOOP_NUM);
size_t index = 0;
for (index = 0; index < DOWORK_LOOP_NUM; index++)
{
IoTHubClient_LL_DoWork(iotHubClientHandle);
ThreadAPI_Sleep(1);
}
}
IoTHubClient_LL_Destroy(iotHubClientHandle);
}
platform_deinit();
}
}