-
Notifications
You must be signed in to change notification settings - Fork 0
/
MQTTSubscriber.lf
283 lines (244 loc) · 11.4 KB
/
MQTTSubscriber.lf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/**
* Reactor that subscribes to a specified MQTT topic on which string messages are published. The
* timestamp of the output will depend on the `use_physical_time` parameter and (if present) the
* timestamp carried by the incoming message.
*
* If `use_physical_time` is `true` (the default), then this reactor uses the current physical time
* when the subscription notification arrives, plus the `offset`, as the desired output timestamp.
* If the incoming message is carrying a timestamp (the publisher is an instance of `MQTTPublisher`
* with `include_timestamp` set to `true`), and if `relative_timestamp` is false, then this reactor
* measures the *apparent latency* (the physical time of arrival minus the timestamp in the
* message). At shutdown, this reactor will report that maximum and average apparent latencies.
*
* If `use_physical_time` is `false`, then this reactor extracts the publisher's timestamp from the
* message and adds the specified offset to get the desired output timestamp, if there is a
* publisher's timestamp. If `relative_timestamp` is true (the default is `false`), then the
* timestamp is assumed to be relative to the start time of the program. Note that the start time of
* the sending program may not match the start time of this program, but this parameter can make it
* easier to synchronize the two programs. If there is no timestamp on the incoming message, then
* this prints a warning and uses physical time. If the resulting desired timestamp equals current
* logical time, then a microstep is added. If the desired output timestamp is in the past, then a
* warning will be printed and the tag of the message will be one microstep later than the current
* tag when it arrives.
*
* If `use_physical_time` is `false` and there is no publisher's timestamp on the message, then
* output timestamp will be as early as possible, usually one microstep later than the current
* logical time.
*
* Note that if the publisher and subscriber are both Lingua Franca programs, then the communication
* behaves like a physical connection if `use_physical_time` is true (the default). The offset is
* then equivalent to an `after` delay. By setting `use_physical_time` to false, you can get more
* control over the synchronization of the two programs.
*
* @param address The IP address of the MQTT broker.
* @param topic The topic name to which to subscribe.
* @param use_physical_time If true, then use physical time (the default).
* @param relative_timestamp If true, then the timestamp is relative to the start time of the
* program.
* @param offset The offset to add to the publisher's timestamp.
* @see MQTTPublisher.
*
* @author Ravi Akella
* @author Edward A. Lee
*/
target C
import MQTTBase from "private/MQTTBase.lf"
preamble {=
#ifndef MQTT_SUBSCRIBER_H
#define MQTT_SUBSCRIBER_H
// Fix the QoS to indicate that the message will be delivered reliably exactly once.
#define QOS 2
#include "MQTTClient.h" // Needs to be repeated even though it's in the base class.
typedef struct MQTTSubscriber_info_t {
void* logical_action;
environment_t* environment;
interval_t offset;
bool use_physical_time;
bool relative_timestamp;
interval_t latencies; // Sum of all observed latencies.
interval_t max_latency;
size_t count;
} MQTTSubscriber_info_t;
#endif // MQTT_SUBSCRIBER_H
=}
reactor MQTTSubscriber(
use_physical_time: bool = true,
relative_timestamp: bool = false,
offset: time = 0) extends MQTTBase {
preamble {=
// Count of instances of this reactor so that unique client IDs are generated.
static size_t _lf_MQTTSubscriber_count = 0;
// Connection options for the client.
// Making this global means that all instances of this reactor have
// the same connection options.
MQTTClient_connectOptions sub_connect_options = MQTTClient_connectOptions_initializer;
// Callback function invoked by MQTT when a message arrives.
int message_arrived(
void *info,
char *topic_name,
int topic_length, // If 0, strlen(topic_name) can be trusted.
MQTTClient_message *message
) {
instant_t physical_time = lf_time_physical();
// FIXME: This is assuming that the message string
// and topic_name are null terminated. What if they aren't?
// Perhaps force them to be?
LF_PRINT_LOG(
"MQTTSubscriber: Message arrived on topic %s: %s", topic_name, (char*)message->payload
);
MQTTSubscriber_info_t* my_info = (MQTTSubscriber_info_t*)info;
// Enter a critical section so that logical time does not elapse while
// we calculate the delay to the logical time for the message.
// Since this is outside the scope of a reaction, we have to explicitly
// pass in the environment.
lf_critical_section_enter(my_info->environment);
interval_t delay;
instant_t current_time = lf_time_logical(my_info->environment);
interval_t offset = my_info->offset;
// Extract the publisher's timestamp from the message, if it is present.
if (
// Is the string null terminated?
(int)((char*)message->payload)[message->payloadlen - sizeof(instant_t) - 5] == '\0'
// Is the magic string present?
&& memcmp("LFts", &message->payload[message->payloadlen - sizeof(instant_t) - 4], 4) == 0
) {
instant_t timestamp = extract_int64(
(unsigned char*)message->payload + message->payloadlen - sizeof(instant_t)
);
// Measure the latency, unless the timestamp is relative to the start time. In that case,
// it is not a latency.
if (!my_info->relative_timestamp) {
my_info->count++;
interval_t latency = physical_time - timestamp;
my_info->latencies += latency;
if (latency > my_info->max_latency) {
my_info->max_latency = latency;
}
}
if (my_info->use_physical_time) {
// Use physical time.
delay = physical_time + offset - current_time;
} else {
// Use logical time.
// If the timestamp is relative to the start time, then we need to add the start time.
if (my_info->relative_timestamp) {
timestamp += lf_time_start();
}
delay = timestamp + offset - current_time;
}
// Schedule the event.
// We rely on lf_schedule_copy to issue a warning if the delay is negative.
lf_schedule_copy(
((MQTTSubscriber_info_t*)info)->logical_action,
delay,
(char*)message->payload,
message->payloadlen - sizeof(instant_t)
);
} else {
if (!my_info->use_physical_time) {
// No timestamp included, so we schedule as early as possible.
lf_schedule_copy(
((MQTTSubscriber_info_t*)info)->logical_action,
0,
(char*)message->payload,
message->payloadlen
);
} else {
// Use physical time.
delay = physical_time + offset - current_time;
// Schedule the event.
// We rely on lf_schedule_copy to issue a warning if the delay is negative.
lf_schedule_copy(
((MQTTSubscriber_info_t*)info)->logical_action,
delay,
(char*)message->payload,
message->payloadlen
);
}
}
LF_PRINT_LOG(
"MQTTSubscriber: Received message. Timestamp will be " PRINTF_TIME
" ahead of current (elapsed) time, " PRINTF_TIME, delay, current_time - lf_time_start()
);
lf_critical_section_exit(my_info->environment);
// MQTTClient_freeMessage() also frees the memory allocated to the payload,
// which is why we have to copy the message here.
MQTTClient_freeMessage(&message);
MQTTClient_free(topic_name);
// Return true to indicate that the message has been successfully handled.
return 1;
}
/** Callback invoked if the connection is lost. */
void sub_connection_lost(void *info, char *cause) {
// NOTE: cause is always NULL. Likely cause is a duplicate ID.
lf_print_error_and_exit("MQTTSubscriber: Connection lost. Possibly another client with the same ID has connected.");
}
// Callback invoked if this client gets diconnected.
void sub_disconnected(void *context, MQTTProperties *properties, enum MQTTReasonCodes reasonCode) {
lf_print_error_and_exit("MQTTSubscriber: Has been disconnected: %s", MQTTReasonCode_toString(reasonCode));
}
=}
/**
* Output for sending the incoming MQTT message. Use type char* rather than string because it is
* not a static string, but rather dynamically allocated memory.
*/
output message: char*
/**
* Action that is triggered when there is an incoming MQTT message. Use a logical action here so
* that the callback function can precisely control timestamp of the received message.
*/
logical action act: char*
/** State variable storing the MQTT client created for each instance of this reactor. */
state client: MQTTClient = {= NULL =}
/** Struct containing the action and offset. */
state info: MQTTSubscriber_info_t = {= {NULL, NULL, 0LL, false, false, 0LL, 0LL, 0} =}
reaction(startup) -> act {=
int rc; // response code.
// In case there are multiple instances of this or the subscriber, enter
// a critical section. The Paho MQTT functions are not thread safe.
lf_critical_section_enter(self->base.environment);
sub_connect_options.keepAliveInterval = 20;
sub_connect_options.cleansession = 1;
self->info.logical_action = act;
self->info.environment = self->base.environment;
self->info.offset = self->offset;
self->info.use_physical_time = self->use_physical_time;
self->info.relative_timestamp = self->relative_timestamp;
// Set up callback functions.
// Last argument should be a pointer to a function to
// handle notification of delivery of a sent message.
// But this reactor isn't sending any messages.
rc = MQTTClient_setCallbacks(self->client, &self->info, sub_connection_lost, message_arrived, NULL);
check(rc, FATAL, "Failed to set callbacks");
rc = MQTTClient_setDisconnected(self->client, NULL, sub_disconnected);
check(rc, FATAL, "Failed to set disconnected callback");
// Connect to the broker.
rc = MQTTClient_connect(self->client, &sub_connect_options);
check(rc, FATAL, "Failed to connect to broker. Is one running?");
rc = MQTTClient_subscribe(self->client, self->topic, QOS);
check(rc, FATAL, "Failed to subscribe");
lf_critical_section_exit(self->base.environment);
=}
reaction(act) -> message {=
// The action contains a token that we can just forward.
// The allocated memory will be freed when the token's reference count hits 0.
// Note that this token will still contain the publisher's timestamp.
lf_set_token(message, act->token);
=}
reaction(shutdown) {=
if (self->info.count > 0) {
lf_print(
"MQTTSubscriber: Maximum apparent latency measured at receiver (in nsec): " PRINTF_TIME,
self->info.max_latency
);
lf_print(
"MQTTSubscriber: Average apparent latency measured at receiver (in nsec): " PRINTF_TIME,
self->info.latencies/self->info.count
);
}
int rc = MQTTClient_disconnect(self->client, 10000);
check(rc, NOT_FATAL, "Failed to disconnect");
MQTTClient_destroy(&self->client);
LF_PRINT_LOG("MQTTSubscriber: Client ID %s disconnecting.", self->clientID);
=}
}