-
Notifications
You must be signed in to change notification settings - Fork 57
/
message.go
88 lines (83 loc) · 2.3 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package iotservice
import (
"encoding/json"
"fmt"
"time"
"github.com/Azure/go-amqp"
"github.com/amenzhinsky/iothub/common"
)
// FromAMQPMessage converts a amqp.Message into common.Message.
//
// Exported to use with a custom stream when devices telemetry is
// routed for example to an EventhHub instance.
func FromAMQPMessage(msg *amqp.Message) *common.Message {
m := &common.Message{
Payload: msg.GetData(),
Properties: make(map[string]string, len(msg.ApplicationProperties)+5),
}
if msg.Properties != nil {
m.UserID = string(msg.Properties.UserID)
if msg.Properties.MessageID != nil {
m.MessageID = msg.Properties.MessageID.(string)
}
if msg.Properties.CorrelationID != nil {
m.CorrelationID = msg.Properties.CorrelationID.(string)
}
if msg.Properties.To != nil {
m.To = *msg.Properties.To
}
m.ExpiryTime = msg.Properties.AbsoluteExpiryTime
}
for k, v := range msg.Annotations {
switch k {
case "iothub-enqueuedtime":
t, _ := v.(time.Time)
m.EnqueuedTime = &t
case "iothub-connection-device-id":
m.ConnectionDeviceID = v.(string)
case "iothub-connection-auth-generation-id":
m.ConnectionDeviceGenerationID = v.(string)
case "iothub-connection-auth-method":
var am common.ConnectionAuthMethod
if err := json.Unmarshal([]byte(v.(string)), &am); err != nil {
m.Properties[k.(string)] = fmt.Sprint(v)
continue
}
m.ConnectionAuthMethod = &am
case "iothub-message-source":
m.MessageSource = v.(string)
default:
m.Properties[k.(string)] = fmt.Sprint(v)
}
}
for k, v := range msg.ApplicationProperties {
if v, ok := v.(string); ok {
m.Properties[k] = v
} else {
m.Properties[k] = ""
}
}
return m
}
// toAMQPMessage converts amqp.Message into common.Message.
func toAMQPMessage(msg *common.Message) *amqp.Message {
props := make(map[string]interface{}, len(msg.Properties))
for k, v := range msg.Properties {
props[k] = v
}
var expiryTime time.Time
if msg.ExpiryTime != nil {
expiryTime = *msg.ExpiryTime
}
return &amqp.Message{
Data: [][]byte{msg.Payload},
Properties: &amqp.MessageProperties{
To: &msg.To,
UserID: []byte(msg.UserID),
MessageID: msg.MessageID,
CorrelationID: msg.CorrelationID,
AbsoluteExpiryTime: &expiryTime,
},
ApplicationProperties: props,
}
}