-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
msg_parser.py
124 lines (104 loc) · 3.29 KB
/
msg_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import json, threading, logging
class Message:
def __init__(self, messageType):
self.messageType = messageType
class Request(Message):
def __init__(self, name, id, arguments):
super(Request, self).__init__("Request")
self.name = name
self.id = id
self.arguments = arguments
class Response(Message):
def __init__(self, type, id, status, errors, data):
super(Response, self).__init__("Response")
self.type = type
self.id = id
self.status = status
self.errors = errors
self.data = data
class Event(Message):
def __init__(self, type, data):
super(Event, self).__init__("Event")
self.type = type
self.data = data
class MsgManager:
curId = 0
requestsPending = {}
def __init__(self):
self.id_lock = threading.RLock()
def sendRequest(self, interop, requestName, arguments, timeout=2):
curId = 0
with self.id_lock:
self.curId += 1
curId = self.curId
waitForResponse = threading.Event()
self.requestsPending[curId] = {"name": requestName, "event": waitForResponse}
interop.send("Request",
json.dumps(
{
"Id": curId,
"Name": requestName,
"Arguments": arguments
}))
waitResult = waitForResponse.wait(timeout)
res = self.requestsPending[curId]
self.requestsPending.pop(curId)
if not waitResult:
return None
if res['name'] == "Hello":
logging.getLogger(__name__).info("Got Hello response")
if (res["response"] != None and res["response"].errors != None and len(res["response"].errors) > 0):
logging.getLogger(__name__).info("Errors in response: '%s'", str(res["response"].errors))
return res["response"]
def sendRequestAsync(self, interop, requestName, arguments, callback):
curId = 0
with self.id_lock:
self.curId += 1
curId = self.curId
self.requestsPending[curId] = {"name": requestName, "callback": lambda res : self.callCallback(curId, res, callback)}
interop.send("Request",
json.dumps(
{
"Id": curId,
"Name": requestName,
"Arguments": arguments
}))
def sendEvent(self, interop, eventName, data):
interop.send("Event",
json.dumps(
{
"Name": eventName,
"Data": data
}))
def sendResponse(self, interop, id, status, result={}, errors=[]):
interop.send("Response",
json.dumps(
{
"Id": id,
"Status": status,
"Result": result,
"Errors": errors
}))
def callCallback(self, curId, response, callback):
self.requestsPending.pop(curId)
callback(response)
def parse(self, message):
messageParsed = json.loads(message[1])
messageType = message[0];
if messageType == "Response":
resId = messageParsed["Id"]
if resId in self.requestsPending:
name = self.requestsPending[resId]["name"]
response = Response(name, messageParsed["Id"], messageParsed["Status"], messageParsed["Errors"], messageParsed["Result"])
if "event" in self.requestsPending[resId].keys():
self.requestsPending[resId]["response"] = response
self.requestsPending[resId]["event"].set()
else:
self.requestsPending[resId]["callback"](response)
elif messageType == "Event":
return Event(messageParsed["Name"], messageParsed["Data"])
elif messageType == "Request":
return Request(messageParsed["Name"], messageParsed["Id"], messageParsed["Arguments"])
else:
logging.getLogger(__name__).warning("Fuse: Message type '" + messageType + "' unknown.")
return None