-
Notifications
You must be signed in to change notification settings - Fork 11
/
test.py
executable file
·235 lines (194 loc) · 9.75 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/python2
import time
import unittest
from application import log
from application.notification import IObserver, NotificationCenter
from application.python import Null
from application.python.queue import EventQueue
from threading import Event
from zope.interface import implements
from otr import OTRTransport, OTRSession, OTRState, SMPStatus
from otr.cryptography import DSAPrivateKey
from otr.exceptions import IgnoreMessage
class DataConnection(object):
implements(IObserver)
def __init__(self, name):
self.name = name
self.secret = None
self.private_key = DSAPrivateKey.generate()
self.otr_session = OTRSession(self.private_key, transport=self)
self.peer = None
self.send_queue = EventQueue(handler=self._send_handler)
self.send_queue.start()
self.ake_done = Event()
self.smp_done = Event()
self.all_done = Event()
self.otr_done = Event()
self.smp_status = None
self.same_secrets = None
self.sent_message = None
self.received_message = None
NotificationCenter().add_observer(self, sender=self.otr_session)
def _send_handler(self, message):
time.sleep(0.01)
self.peer.receive(message)
def connect(self, peer):
self.peer = peer
def disconnect(self):
self.send_queue.stop()
self.send_queue = None
def start_otr(self, secret=None):
self.secret = secret
self.otr_session.start()
def stop_otr(self):
self.otr_session.stop()
def inject_otr_message(self, message):
log.debug("{0.name} sending: {1!r}".format(self, message))
self.send_queue.put(message)
def send(self, content, content_type='text/plain'):
log.debug("{0.name} encoding: {1!r}".format(self, content))
self.sent_message = content
content = self.otr_session.handle_output(content, content_type)
log.debug("{0.name} sending: {1!r}".format(self, content))
self.send_queue.put(content)
def receive(self, message):
# log.debug("{0.name} received: {1!r}".format(self, message))
try:
message = self.otr_session.handle_input(message, 'text/plain')
except IgnoreMessage:
return
else:
log.debug("{0.name} decoded: {1!r}".format(self, message))
self.received_message = message
self.all_done.set()
def handle_notification(self, notification):
handler = getattr(self, '_NH_{0.name}'.format(notification), Null)
handler(notification)
def _NH_OTRSessionStateChanged(self, notification):
if notification.data.new_state is OTRState.Encrypted:
self.ake_done.set()
if self.secret is None:
self.smp_done.set()
elif self.name < self.peer.name:
self.otr_session.smp_verify(secret=self.secret)
elif notification.data.old_state is OTRState.Encrypted:
self.otr_done.set()
def _NH_OTRSessionSMPVerificationDidStart(self, notification):
if notification.data.originator == 'remote':
if self.secret:
self.otr_session.smp_answer(secret=self.secret)
else:
self.otr_session.smp_abort()
def _NH_OTRSessionSMPVerificationDidNotStart(self, notification):
self.smp_status = notification.data.reason
self.smp_done.set()
def _NH_OTRSessionSMPVerificationDidEnd(self, notification):
self.same_secrets = notification.data.same_secrets
self.smp_status = notification.data.status
self.smp_done.set()
OTRTransport.register(DataConnection)
class NotificationObserver(object):
implements(IObserver)
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self)
def stop(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self)
@staticmethod
def handle_notification(notification):
log.debug("--- {0.name!s} from {0.sender!r} with data: {0.data!r}".format(notification))
class OTRTest(unittest.TestCase):
notification_observer = None
@classmethod
def setUpClass(cls):
cls.notification_observer = NotificationObserver()
cls.notification_observer.start()
@classmethod
def tearDownClass(cls):
cls.notification_observer.stop()
cls.notification_observer = None
def setUp(self):
self.local_endpoint = DataConnection('local')
self.remote_endpoint = DataConnection('remote')
self.local_endpoint.connect(self.remote_endpoint)
self.remote_endpoint.connect(self.local_endpoint)
def tearDown(self):
self.local_endpoint.disconnect()
self.remote_endpoint.disconnect()
def test_ake_one_way(self):
self.local_endpoint.start_otr()
self.local_endpoint.ake_done.wait(1)
self.remote_endpoint.ake_done.wait(1)
self.assertIs(self.local_endpoint.otr_session.state, OTRState.Encrypted, "AKE failed on local endpoint")
self.assertIs(self.remote_endpoint.otr_session.state, OTRState.Encrypted, "AKE failed on remote endpoint")
def test_ake_two_way(self):
self.local_endpoint.start_otr()
self.remote_endpoint.start_otr()
self.local_endpoint.ake_done.wait(1)
self.remote_endpoint.ake_done.wait(1)
self.assertIs(self.local_endpoint.otr_session.state, OTRState.Encrypted, "AKE failed on local endpoint")
self.assertIs(self.remote_endpoint.otr_session.state, OTRState.Encrypted, "AKE failed on remote endpoint")
def test_smp_same_secret(self):
self.local_endpoint.start_otr(secret='foobar')
self.remote_endpoint.start_otr(secret='foobar')
self.local_endpoint.smp_done.wait(1)
self.remote_endpoint.smp_done.wait(1)
self.assertIs(self.local_endpoint.smp_status, SMPStatus.Success, "SMP was not successful for the local endpoint")
self.assertIs(self.remote_endpoint.smp_status, SMPStatus.Success, "SMP was not successful for the remote endpoint")
self.assertTrue(self.local_endpoint.same_secrets, "SMP didn't find that secrets were the same for the local endpoint")
self.assertTrue(self.remote_endpoint.same_secrets, "SMP didn't find that secrets were the same for the remote endpoint")
def test_smp_different_secret(self):
self.local_endpoint.start_otr(secret='foobar')
self.remote_endpoint.start_otr(secret='foobar2')
self.local_endpoint.smp_done.wait(1)
self.remote_endpoint.smp_done.wait(1)
self.assertIs(self.local_endpoint.smp_status, SMPStatus.Success, "SMP was not successful for the local endpoint")
self.assertIs(self.remote_endpoint.smp_status, SMPStatus.Success, "SMP was not successful for the remote endpoint")
self.assertFalse(self.local_endpoint.same_secrets, "SMP didn't find that secrets were different for the local endpoint")
self.assertFalse(self.remote_endpoint.same_secrets, "SMP didn't find that secrets were different for the remote endpoint")
def test_smp_unavailable(self):
self.local_endpoint.start_otr(secret='foobar')
self.remote_endpoint.start_otr(secret=None) # remote endpoint will abort the SMP as it doesn't have a secret
self.local_endpoint.smp_done.wait(1)
self.remote_endpoint.smp_done.wait(1)
self.assertIs(self.local_endpoint.smp_status, SMPStatus.Interrupted, "SMP was not aborted for the local endpoint")
self.assertIs(self.remote_endpoint.smp_status, SMPStatus.Interrupted, "SMP was not aborted for the remote endpoint")
def test_text_encryption(self):
self.local_endpoint.start_otr()
self.remote_endpoint.start_otr()
self.local_endpoint.ake_done.wait(1)
self.remote_endpoint.ake_done.wait(1)
self.local_endpoint.send('hello')
self.remote_endpoint.send('test')
self.local_endpoint.all_done.wait(1)
self.remote_endpoint.all_done.wait(1)
self.assertEqual(self.local_endpoint.sent_message, self.remote_endpoint.received_message, "The message sent by local was not received correctly on remote")
self.assertEqual(self.remote_endpoint.sent_message, self.local_endpoint.received_message, "The message sent by remote was not received correctly on local")
def test_otr_shutdown_one_way(self):
self.local_endpoint.start_otr()
self.remote_endpoint.start_otr()
self.local_endpoint.ake_done.wait(1)
self.remote_endpoint.ake_done.wait(1)
self.local_endpoint.stop_otr()
self.local_endpoint.otr_done.wait(1)
self.remote_endpoint.otr_done.wait(1)
self.assertIs(self.local_endpoint.otr_session.state, OTRState.Plaintext, "Local session state is not Plaintext")
self.assertIs(self.remote_endpoint.otr_session.state, OTRState.Finished, "Remote session state is not Finished")
self.remote_endpoint.stop_otr()
self.assertIs(self.remote_endpoint.otr_session.state, OTRState.Plaintext, "Remote session state is not Plaintext")
def test_otr_shutdown_two_way(self):
self.local_endpoint.start_otr()
self.remote_endpoint.start_otr()
self.local_endpoint.ake_done.wait(1)
self.remote_endpoint.ake_done.wait(1)
self.local_endpoint.stop_otr()
self.remote_endpoint.stop_otr()
self.local_endpoint.otr_done.wait(1)
self.remote_endpoint.otr_done.wait(1)
self.assertIs(self.local_endpoint.otr_session.state, OTRState.Plaintext, "Local session state is not Plaintext")
self.assertIs(self.remote_endpoint.otr_session.state, OTRState.Plaintext, "Remote session state is not Plaintext")
if __name__ == '__main__':
log.Formatter.prefix_format = '{record.levelname:<8s} '
log.level.current = log.level.INFO
unittest.main(verbosity=2)