-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.h
292 lines (254 loc) · 8.08 KB
/
message.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
//
// Created by iwant on 2019/1/23.
//
#ifndef MQTT_BROKER_MESSAGE_H
#define MQTT_BROKER_MESSAGE_H
/**************************
* 消息(报文) 数据结构
**************************/
#include <stdbool.h>
/**
* 所有控制报文(消息)类型
*/
enum MessageType {
RESERVED_ = 0,
CONNECT = 1,
CONNACK = 2,
PUBLISH = 3,
PUBACK = 4,
PUBREC = 5,
PUBREL = 6,
PUBCOMP = 7,
SUBSCRIBE = 8,
SUBACK = 9,
UNSUBSCRIBE = 10,
UNSUBACK = 11,
PINGREQ = 12,
PINGRESP = 13,
DISCONNECT = 14,
RESERVED__ = 15,
};
/**
* 固定报头
*/
typedef struct _FixedHeader {
int message_type; /* 控制报文类型 */
int remain_len; /* 剩余长度 */
char flag; /* 控制报文标志 */
char reserved; /* 对齐 */
// 以下 PUBLISH 报文的固定头部才有(flag 的细化)
bool dup_flag; /* 重发标志 */
int qos; /* 服务质量 */
bool retain_flag; /* 保留标志 */
// 固定报头长度(非 MQTT 报文内容)
int fh_len; /* 固定报头长度(非 MQTT 报文内容) */
} FixedHeader;
/******************** CONNECT ********************/
/**
* CONNECT 可变报头
*/
typedef struct _ConnVariableHeader {
char *p_protocol_name; /* 恒指向为 ‘MQTT’ */
int protocol_name_len; /* 恒为 4 */
int level; /* 恒为 4 */
bool username_flag;
bool passwd_flag;
bool will_retain; /* 遗言相关 */
bool will_flag;
int will_qos;
bool is_clean_session; /* 是否清理会话 */
char reserved_[3]; /* 对齐 */
int ping_time; /* 即 KeepAlive */
} ConnVariableHeader;
/**
* CONNECT 有效载荷
*/
typedef struct _ConnPayload {
char *p_client_id; /* 客户端标识符 */
int client_id_len; /* 客户端标识符长度 */
char *p_will_topic; /* 遗言主题 */
int will_topic_len; /* 遗言主题长度 */
char *p_will_message; /* 遗言消息 */
int will_message_len; /* 遗言消息长度 */
char *p_username; /* 用户名 */
int username_len; /* 用户名长度 */
char *p_passwd; /* 密码 */
int passwd_len; /* 密码长度 */
} ConnPayload;
/******************** PUBLISH ********************/
/**
* PUBLISH 可变报头
*/
typedef struct _PublishVariableHeader {
char *p_topic; /* 发布的主题 */
int topic_len;
int message_id; /* 报文标识符,qos >= 1 时才有 */
} PublishVariableHeader;
/**
* PUBLISH 有效载荷
*/
typedef struct _PublishPayload {
char *p_content; /* 消息内容 */
int content_len; /* 内容长度 */
} PublishPayload;
/** PUBACK or PUBREC or PUBREL or PUBCOMP or SUBSCRIBE or UNSUBSCRIBE **/
/**
* PUBACK or PUBREC or PUBREL or PUBCOMP or SUBSCRIBE or UNSUBSCRIBE 的公共可变报头
*/
typedef struct _CommonVariableHeader {
int message_id; /* 报文标识符 */
} CommonVariableHeader;
/******************** SUBSCRIBE ********************/
/**
* 该结构体中所有指针指向的为重新申请的内存,所以注意释放
*/
typedef struct _SubscribePayload {
int topic_filter_len; /* 所含主题过滤器个数 */
int *p_topic_filter_qos; /* 订阅质量 */
char **p_topic_filter; /* 主题过滤器 */
} SubscribePayload;
/******************** UNSUBSCRIBE ********************/
/**
* 该结构体中所有指针指向的为重新申请的内存,所以注意释放
*/
typedef struct _UnsubscribePayload {
int topic_filter_len; /* 所含主题过滤器个数 */
char **p_topic_filter; /* 主题过滤器 */
} UnsubscribePayload;
/******************** MESSAGE(综) ********************/
/**
* 三个字段都要各自重新申请内存
*/
typedef struct _Message {
FixedHeader *p_fixed_header; /* 固定报头 */
void *p_variable_header; /* 可变报头 */
int vh_len; /* 可变报头的长度 */
void *p_payload; /* 有效载荷 */
int payload_len; /* 有效载荷的长度 */
} Message;
/**
* 用于进程间传递发布内容,或用于发送者未确认的消息存储
*/
typedef struct _PublishMessage {
int qos; /* 发布质量 */
char *p_topic; /* 发布主题 */
int topic_len; /* 主题长度 */
char *p_content; /* 发布内容 */
int content_len; /* 发布内容长度 */
int message_id; /* 发布的报文标识符 */
} PublishMessage;
/**************************
* 消息(报文) 相关操作
**************************/
/**
* 删除报文
* @param p_message
*/
void deleteMessage(void *p_message);
/**
* 创建 PublishMessage
* @param p_pm
* @param qos
* @param p_topic
* @param topic_len
* @param p_content
* @param content_len
* @param message_id
*/
void createPublishMessage(PublishMessage *p_pm, int qos, char *p_topic, int topic_len, char *p_content, int content_len, int message_id);
/**
* 删除“中间”消息(PublishMessage 结构体)
* @param p_pm
*/
void deletePublishMessage(void *p_pm);
/******************** 拆包 ********************/
/**
* 拆包:固定报头
* @param p_message
* @param p_fh
* @return 固定报头所占的字节数
*/
int parseFixedHeader(char *p_message, FixedHeader *p_fh);
/**
* 拆包:CONNECT (可变报头 & 有效载荷)
* @param p_message
* @param p_fh
* @param p_conn_vh
* @param p_conn_payload
*/
void parseConnect(char *p_message, FixedHeader *p_fh, ConnVariableHeader *p_conn_vh, ConnPayload *p_conn_payload);
/**
* 拆包:PUBLISH (可变报头 & 有效载荷)
* @param p_message
* @param p_fh
* @param p_conn_vh
* @param p_conn_payload
*/
void parsePublish(char *p_message, FixedHeader *p_fh, PublishVariableHeader *p_pub_vh, PublishPayload *p_pub_payload);
/**
* 拆包:通用可变报头(PUBACK or PUBREC or PUBREL or PUBCOMP or SUBSCRIBE or UNSUBSCRIBE)
* @param p_message
* @param p_common_vh
*/
void parseCommonVariableHeader(char *p_message, CommonVariableHeader *p_common_vh);
/**
* 拆包:SUBSCRIBE(可变报头 & 有效载荷)
* @param p_message
* @param p_fh
* @param p_vh
* @param p_sub_payload
*/
void parseSubscribe(char *p_message, FixedHeader *p_fh, CommonVariableHeader *p_vh, SubscribePayload *p_sub_payload);
/**
* 拆包:UNSUBSCRIBE(可变报头 & 有效载荷)
* @param p_message
* @param p_fh
* @param p_vh
* @param p_unsub_payload
*/
void
parseUnsubscribe(char *p_message, FixedHeader *p_fh, CommonVariableHeader *p_vh, UnsubscribePayload *p_unsub_payload);
/******************** 封包 ********************/
/**
* 封包:固定报头
* @param p_message
* @param p_fh
* @return
*/
int packageFixedHeader(char *p_message, FixedHeader *p_fh);
/**
* 封包:CONNACK
* @param p_message
* @param is_clean_session
* @param return_code
*/
void packageConnack(char *p_message, bool is_clean_session, char return_code);
/**
* 封包:PUBACK PUBREC PUBREL PUBCOMP
* @param p_message
* @param p_fh
* @param message_id
*/
void packageAck(char *p_message, FixedHeader *p_fh, int message_id);
/**
* 封包:PUBLISH
* @param p_message
* @param p_fh
* @param p_topic
* @param topic_len
* @param message_id
* @param p_content
* @param content_len
*/
void packagePublish(char *p_message, FixedHeader *p_fh, char *p_topic, int topic_len, int message_id, char *p_content,
int content_len);
/**
* 封包:SUBACK
* @param p_message
* @param p_fh
* @param message_id
* @param return_code_len
* @param p_return_code
*/
void packageSuback(char *p_message, FixedHeader *p_fh, int message_id, int return_code_len, char *p_return_code);
#endif //MQTT_BROKER_MESSAGE_H