-
Notifications
You must be signed in to change notification settings - Fork 0
/
pelion_websocket.py
147 lines (113 loc) · 4.15 KB
/
pelion_websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/python3
import sys, pycurl, certifi, websocket, json, base64
from io import BytesIO
def main(argv):
#get api key from command line argument
if len(sys.argv) > 1:
api_key = argv[0]
else:
print("Error: please pass API KEY")
sys.exit()
create_websocket(api_key)
resource_uri = '/3200/0/5501' #digital counter resource used in mbed-os-example-pelion
subscribe_to_resource(api_key,resource_uri)
def create_websocket(api_key):
#enable websocket notification channel
print("CREATE WEBHOOK")
#prepare request and buffer for response
buffer = BytesIO()
header = []
header.append("Authorization: Bearer " + api_key)
c = pycurl.Curl()
c.setopt(c.URL, 'https://api.us-east-1.mbedcloud.com/v2/notification/websocket')
c.setopt(c.WRITEDATA, buffer)
c.setopt(c.CAINFO, certifi.where())
c.setopt(c.NOPROGRESS,1)
c.setopt(c.NOBODY,1)
c.setopt(c.HEADER,1)
c.setopt(c.HTTPHEADER, header)
c.setopt(c.CUSTOMREQUEST, "PUT")
c.setopt(c.TCP_KEEPALIVE,1)
#execute request
c.perform()
c.close()
#print response
body = buffer.getvalue()
print(body.decode('iso-8859-1'))
#connect to registered websocket and open the channel
websocket.enableTrace(True)
header = []
header.append("Authorization: Bearer " + api_key)
ws = websocket.WebSocketApp("wss://api.us-east-1.mbedcloud.com/v2/notification/websocket-connect",
on_message = on_message,
on_error = on_error,
on_close = on_close,
header = header
)
ws.on_open = on_open
ws.run_forever()
def on_message(ws, message):
print(message)
#example notification that this parses
#{"notifications":[{"ep":"0172b9ba5409000000000001001117f5","path":"/3200/0/5501","ct":"text/plain","payload":"NzE=","max-age":0}]}
#convert from string to dict object
msg = json.loads(message)
#parse message for notifications
for notification in msg['notifications']:
#check for the device id, not always the same as endpoint id, but called endpoint here
if "ep" in notification:
ep = notification['ep']
print("device: " + ep)
#check for the resource uri, aka path
if "path" in notification:
resource_uri = notification['path']
print("resource: " + resource_uri)
#check for the resource value, aka payload
if "payload" in notification:
resource_value = str(base64.b64decode(notification['payload']))
print("value: " + resource_value)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
print("### opened ###")
def subscribe_to_resource(api_key,resource_uri):
#subscribe to a specific resource on any endpoint connected to your account
# curl -X PUT https://api.us-east-1.mbedcloud.com/v2/subscriptions \
# -H 'Authorization: Bearer PELION_API_KEY' \
# -H 'content-type: application/json' \
# -d '[
# {
# "endpoint-name": "*",
# "resource-path": ["/object/instance/resource", ... ]
# }
# ]'
print("SUBSCRIBE TO RESOURCE: " + resource_uri)
#prepare request and buffer for response
buffer = BytesIO()
header = []
header.append("Authorization: Bearer " + api_key)
c = pycurl.Curl()
c.setopt(c.URL, 'https://api.us-east-1.mbedcloud.com/v2/subscriptions/')
c.setopt(c.WRITEDATA, buffer)
c.setopt(c.CAINFO, certifi.where())
c.setopt(c.NOPROGRESS,1)
c.setopt(c.NOBODY,1)
c.setopt(c.HEADER,1)
c.setopt(c.HTTPHEADER, header)
c.setopt(c.CUSTOMREQUEST, "PUT")
c.setopt(c.TCP_KEEPALIVE,1)
data = '[{"endpoint-name": "*","resource-path": ["' + resource_uri + '"]}]'
print(data)
dbuffer = BytesIO(data.encode('utf-8'))
c.setopt(c.UPLOAD, 1)
c.setopt(c.READDATA, dbuffer)
#execute request
c.perform()
c.close()
#print response
body = buffer.getvalue()
print(body.decode('iso-8859-1'))
if __name__ == "__main__":
main(sys.argv[1:])