-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathunit_tests.py
1689 lines (1333 loc) · 64.4 KB
/
unit_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
import math
import os
import random
import shutil
import unittest
import ui_helpers
from kademlia_dht.buckets import BucketList, KBucket
from kademlia_dht.constants import Constants
from kademlia_dht.contact import Contact
from kademlia_dht.dht import DHT
from kademlia_dht.errors import RPCError, TooManyContactsError
from kademlia_dht.id import ID
from kademlia_dht.networking import TCPSubnetServer, TCPServer
from kademlia_dht.node import Node
from kademlia_dht.protocols import TCPSubnetProtocol, VirtualProtocol
from kademlia_dht.routers import ParallelRouter, Router
from kademlia_dht.storage import VirtualStorage, SecondaryJSONStorage
Constants.DEBUG = True
if Constants.DEBUG:
random.seed(0)
logger = ui_helpers.create_logger(verbose=True)
logger.info("Starting unit tests.")
def setup_split_failure(bucket_list=None):
# force host node ID to < 2 ** 159 so the node ID is not in the
# 2 ** 159 ... 2 ** 160 range.
# b_host_id = bytearray()
# b_host_id.extend([0] * 20)
# b_host_id[19] = 0x7F
# May be incorrect - book does some weird byte manipulation.
host_id: ID = ID.random_id(2 ** 158, 2 ** 159 - 1)
dummy_contact: Contact = Contact(host_id, VirtualProtocol())
dummy_contact.protocol.node = Node(dummy_contact, VirtualStorage())
if not bucket_list:
bucket_list = BucketList(our_contact=dummy_contact)
bucket_list.our_id = host_id
# Also add a contact in this 0 - 2 ** 159 range
# This ensures that only 1 bucket split will occur after 20 nodes with ID >= 2 ** 159 are added.
dummy_contact = Contact(ID(1), VirtualProtocol())
dummy_contact.protocol.node = Node(dummy_contact, VirtualStorage())
bucket_list.add_contact(Contact(ID(1), dummy_contact.protocol))
assert len(bucket_list.buckets) == 1 # Bucket split should not have occurred.
assert len(bucket_list.buckets[0].contacts) == 1 # Expected 1 contact in bucket 0.
# Make sure contact IDs all have the same 5-bit prefix and are
# in the 2 ** 159 ... 2 ** 160 - 1 space.
b_contact_id: bytearray = bytearray()
b_contact_id.extend([0] * 20)
b_contact_id[19] = 0x80
# 1000 xxxx prefix, xxxx starts at 1000 (8)
# this ensures that all the contacts in a bucket match only the
# prefix as only the first 5 bits are shared.
# |----| shared range
# 1000 1000 ...
# 1000 1100 ...
# 1000 1110 ...
shifter = 0x08
pos: int = 19
for _ in range(Constants.K):
b_contact_id[pos] |= shifter # |= is Bitwise OR.
contact_id: ID = ID(
int.from_bytes(b_contact_id, byteorder="little")
)
dummy_contact = Contact(ID(1), VirtualProtocol())
dummy_contact.protocol.node = Node(dummy_contact, VirtualStorage())
bucket_list.add_contact(
Contact(contact_id, dummy_contact.protocol)
)
shifter >>= 1 # Right shift (Halves the shift value.)
if shifter == 0:
shifter = 0x80
pos -= 1
return bucket_list
class KBucketTest(unittest.TestCase):
def test_add_to_kbucket(self):
"""
Description
Contact added to a full bucket.
Expected
“TooManyContactsError” should be raised.
:return:
"""
k = Constants.K
k_bucket = KBucket()
for i in range(k):
contact = Contact(ID(i))
k_bucket.add_contact(contact)
self.assertTrue(len(k_bucket.contacts) == k)
def test_too_many_contacts(self):
"""
Description
Contact added to an almost full bucket.
Expected
No exceptions should be raised, length of bucket contacts should be Constants.K
Notes
This implies standard behaviour of contacts being added to a relatively empty bucket,
as it is required as a prerequisite to this test.
:return:
"""
k = Constants.K
k_bucket = KBucket()
for i in range(k):
contact = Contact(ID(i))
k_bucket.add_contact(contact)
with self.assertRaises(TooManyContactsError):
# Trying to add one more contact should raise the exception
contact = Contact(ID(k + 1))
k_bucket.add_contact(contact)
def test_no_funny_business(self):
"""
Description
Compare a Kbucket with no initial contacts parameter, to one
with an empty initial contacts parameter
Expected
They are the same.
:return:
"""
k1: KBucket = KBucket(low=0, high=100)
k2: KBucket = KBucket(low=10, high=200, initial_contacts=[])
self.assertTrue(k1.contacts == k2.contacts)
class AddContactTest(unittest.TestCase):
def test_unique_id_add(self):
"""
Description
Adding K contacts to bucket list.
Expected
Bucket list should not split into separate buckets, and K contacts should exist in one bucket.
:return:
"""
dummy_contact: Contact = Contact(id=ID(0),
protocol=VirtualProtocol())
dummy_contact.protocol.node = Node(dummy_contact, VirtualStorage())
bucket_list: BucketList = BucketList(dummy_contact)
bucket_list.our_id = ID.random_id()
for i in range(Constants.K):
bucket_list.add_contact(Contact(ID.random_id()))
self.assertTrue(
len(bucket_list.buckets) == 1, "No split should have taken place.")
self.assertTrue(
len(bucket_list.buckets[0].contacts) == Constants.K,
"K contacts should have been added.")
def test_duplicate_id(self):
"""
Description
Adding 1 contact to a bucket list twice.
Expected
The bucket list should realise that the contact ID already exists in the buckets,
therefore it should not be added.
:return:
"""
dummy_contact = Contact(ID(0), VirtualProtocol())
dummy_contact.protocol.node = Node(dummy_contact, VirtualStorage())
bucket_list: BucketList = BucketList(dummy_contact)
bucket_list.our_id = ID.random_id()
id: ID = ID.random_id()
bucket_list.add_contact(Contact(id))
bucket_list.add_contact(Contact(id))
self.assertTrue(
len(bucket_list.buckets) == 1, "No split should have taken place.")
self.assertTrue(
len(bucket_list.buckets[0].contacts) == 1,
"Bucket should have one contact.")
def test_bucket_split(self):
"""
Description
Adding K + 1 contacts to an empty bucket list.
Expected
The bucket list should split into 2 separate buckets.
:return:
"""
dummy_contact = Contact(ID(0), VirtualProtocol())
dummy_contact.protocol.node = Node(dummy_contact, VirtualStorage())
bucket_list: BucketList = BucketList(dummy_contact)
bucket_list.our_id = ID.random_id()
for i in range(Constants.K):
bucket_list.add_contact(Contact(ID.random_id()))
bucket_list.add_contact(Contact(ID.random_id()))
print(f"KBucket range for first bucket: {bucket_list.buckets[0].low()}, "
f"{bucket_list.buckets[0].high()}, high log 2: {math.log(bucket_list.buckets[0].high(), 2)}")
print(f"KBucket range for second bucket: {bucket_list.buckets[1].low()}, "
f"{bucket_list.buckets[1].high()}, high log 2: {math.log(bucket_list.buckets[1].high(), 2)}")
self.assertTrue(
len(bucket_list.buckets) > 1,
"Bucket should have split into two or more buckets. "
f"Length of first buckets contacts = {len(bucket_list.buckets[0].contacts)}")
class ForceFailedAddTest(unittest.TestCase):
def test_force_failed_add(self):
"""
Description
Creates a bucket list composed of K ID’s, with a depth of 5 in the range 2 ** 159 to 2 ** 160 – 1,
along with another Contact with ID in range 0 to 2 ** 159 – 1.
Then another contact should be added with ID >= 2 ** 159.
Expected
Bucket split should occur, with 1 contact in the first bucket, and 20 contacts in the second bucket.
Then when the 22nd contact is added, nothing should have changed, due to the depth of the bucket it’s being
added to MOD 5 is 0.
:return:
"""
dummy_contact = Contact(id=ID(0))
node = Node(contact=dummy_contact, storage=VirtualStorage())
bucket_list: BucketList = setup_split_failure()
self.assertTrue(len(bucket_list.buckets) == 2,
f"Bucket split should have occurred. Number of buckets should be 2, is {len(bucket_list.buckets)}.")
self.assertTrue(len(bucket_list.buckets[0].contacts) == 1,
"Expected 1 contact in bucket 0.")
self.assertTrue(len(bucket_list.buckets[1].contacts) == 20,
"Expected 20 contacts in bucket 1.")
# This next contact should not split the bucket as
# depth == 5 and therefore adding the contact will fail.
# Any unique ID >= 2^159 will do.
id = 2 ** 159 + 4
new_contact = Contact(id=ID(id),
protocol=dummy_contact.protocol)
bucket_list.add_contact(new_contact)
self.assertTrue(len(bucket_list.buckets) == 2,
f"Bucket split should have occured. Number of buckets should be 2, is {len(bucket_list.buckets)}.")
self.assertTrue(len(bucket_list.buckets[0].contacts) == 1,
"Expected 1 contact in bucket 0.")
self.assertTrue(len(bucket_list.buckets[1].contacts) == 20,
"Expected 20 contacts in bucket 1.")
self.assertTrue(new_contact not in bucket_list.buckets[1].contacts,
"Expected new contact NOT to replace an older contact.")
class DHTTest(unittest.TestCase):
def test_local_store_find_value(self):
vp = VirtualProtocol()
# Below line should contain VirtualStorage(), which I don't have?
dht = DHT(id=ID.random_id(),
protocol=vp,
storage_factory=VirtualStorage,
router=Router())
# print(dht._originator_storage)
vp.node = dht._router.node
key = ID.random_id()
dht.store(key, "Test")
found, contacts, return_val = dht.find_value(key)
print(found, contacts, return_val)
self.assertTrue(return_val == "Test",
"Expected to get back what we stored.")
def test_value_stored_in_closer_node(self):
"""
This test creates a single contact and stores the value in that contact. We set up the IDs so that the
contact’s ID is less (XOR metric) than our peer’s ID, and we use a key of ID.Zero to prevent further
complexities when computing the distance. Most of the code here is to set up the conditions to make this test!
- "The Kademlia Protocol Succinctly" by Marc Clifton
:return: None
"""
vp1 = VirtualProtocol()
vp2 = VirtualProtocol()
store1 = VirtualStorage()
store2 = VirtualStorage()
# Ensures that all nodes are closer, because id.max ^ n < id.max when n > 0.
# (the distance between a node and max id is always closer than the furthest possible)
dht = DHT(id=ID.max(), router=Router(), protocol=vp1, originator_storage=store1,
republish_storage=store1, cache_storage=VirtualStorage())
vp1.node = dht._router.node
contact_id: ID = ID.mid() # middle ID
other_contact: Contact = Contact(id=contact_id, protocol=vp2)
other_node = Node(contact=other_contact, storage=store2)
vp2.node = other_node
# add this other contact to our peer list
dht._router.node.bucket_list.add_contact(other_contact)
# we want an integer distance, not an XOR distance.
key: ID = ID.min()
val = "Test"
other_node.simply_store(key, val)
self.assertFalse(store1.contains(key),
"Expected our peer to NOT have cached the key-value.")
self.assertTrue(store2.contains(key),
"Expected other node to HAVE cached the key-value.")
# Try and find the value, given our Dht knows about the other contact.
_, _, retval = dht.find_value(key)
self.assertTrue(retval == val,
"Expected to get back what we stored")
def test_value_stored_in_further_node(self):
vp1 = VirtualProtocol()
vp2 = VirtualProtocol()
store1 = VirtualStorage()
store2 = VirtualStorage()
# Ensures that all nodes are closer, because max id ^ n < max id when n > 0.
dht: DHT = DHT(id=ID.min(), protocol=vp1, router=Router(), storage_factory=lambda: store1)
vp1.node = dht._router.node
contact_id = ID.max()
other_contact = Contact(contact_id, vp2)
other_node = Node(other_contact, store2)
vp2.node = other_node
# Add this other contact to our peer list.
dht._router.node.bucket_list.add_contact(other_contact)
key = ID(1)
val = "Test"
other_node.simply_store(key, val)
self.assertFalse(store1.contains(key),
"Expected our peer to have NOT cached the key-value.")
self.assertTrue(store2.contains(key),
"Expected other node to HAVE cached the key-value.")
_, _, retval = dht.find_value(key)
self.assertTrue(retval == val,
"Expected to get back what we stored.")
def test_value_stored_gets_propagated(self):
vp1 = VirtualProtocol()
vp2 = VirtualProtocol()
store1 = VirtualStorage()
store2 = VirtualStorage()
dht: DHT = DHT(id=ID.min(), protocol=vp1, router=Router(), storage_factory=lambda: store1)
vp1.node = dht._router.node
contact_id = ID.mid()
other_contact = Contact(contact_id, vp2)
other_node = Node(other_contact, store2)
vp2.node = other_node
dht._router.node.bucket_list.add_contact(other_contact)
key = ID(0)
val = "Test"
self.assertFalse(store1.contains(key),
"Obviously we don't have the key-value yet.")
self.assertFalse(store2.contains(key),
"And equally obvious, the other peer doesn't have the key-value yet either.")
dht.store(key, val)
self.assertTrue(store1.contains(key),
"Expected our peer to have stored the key-value.")
self.assertTrue(store2.contains(key),
"Expected the other peer to have stored the key-value.")
def test_value_propagates_to_closer_node(self):
vp1 = VirtualProtocol()
vp2 = VirtualProtocol()
vp3 = VirtualProtocol()
store1 = VirtualStorage()
store2 = VirtualStorage()
store3 = VirtualStorage()
cache3 = VirtualStorage()
dht: DHT = DHT(id=ID.max(),
protocol=vp1,
router=Router(),
originator_storage=store1,
republish_storage=store1,
cache_storage=VirtualStorage())
vp1.node = dht._router.node
# setup node 2
contact_id_2 = ID.mid()
other_contact_2 = Contact(contact_id_2, vp2)
other_node_2 = Node(other_contact_2, store2)
vp2.node = other_node_2
# add the second contact to our peer list.
dht._router.node.bucket_list.add_contact(other_contact_2)
# node 2 has the value
key = ID(0)
val = "Test"
other_node_2.storage.set(key, val)
# setup node 3
contact_id_3 = ID(2 ** 158) # I think this is the same as ID.Zero.SetBit(158)?
other_contact_3 = Contact(contact_id_3, vp3)
other_node_3 = Node(other_contact_3, storage=store3, cache_storage=cache3)
vp3.node = other_node_3
# add the third contact to our peer list
dht._router.node.bucket_list.add_contact(other_contact_3)
self.assertFalse(store1.contains(key),
"Obviously we don't have the key-value yet.")
self.assertFalse(store3.contains(key),
"And equally obvious, the third peer doesn't have the key-value yet either.")
ret_found, ret_contacts, ret_val = dht.find_value(key)
self.assertTrue(ret_found, "Expected value to be found.")
self.assertFalse(store3.contains(key), "Key should not be in the republish store.")
self.assertTrue(cache3.contains(key), "Key should be in the cache store.")
self.assertTrue(cache3.get_expiration_time_sec(key.value) == Constants.EXPIRATION_TIME_SEC / 2,
"Expected 12 hour expiration.")
class DHTParallelTest(unittest.TestCase):
"""
The exact same as DHTTest, but with the asynchronous router instead of the normal router.
"""
def test_local_store_find_value(self):
vp = VirtualProtocol()
# Below line should contain VirtualStorage(), which I don't have?
dht = DHT(id=ID.random_id(),
protocol=vp,
storage_factory=VirtualStorage,
router=ParallelRouter())
vp.node = dht._router.node
key = ID.random_id()
dht.store(key, "Test")
_, _, return_val = dht.find_value(key)
self.assertTrue(return_val == "Test",
"Expected to get back what we stored.")
def test_value_stored_in_closer_node(self):
"""
This test creates a single contact and stores the value in that contact. We set up the IDs so that the
contact’s ID is less (XOR metric) than our peer’s ID, and we use a key of ID.Zero to prevent further
complexities when computing the distance. Most of the code here is to set up the conditions to make this test!
- "The Kademlia Protocol Succinctly" by Marc Clifton
:return: None
"""
vp1 = VirtualProtocol()
vp2 = VirtualProtocol()
store1 = VirtualStorage()
store2 = VirtualStorage()
# Ensures that all nodes are closer, because id.max ^ n < id.max when n > 0.
# (the distance between a node and max id is always closer than the furthest possible)
dht = DHT(id=ID.max(), router=ParallelRouter(), storage_factory=lambda: store1, protocol=VirtualProtocol())
vp1.node = dht._router.node
contact_id: ID = ID.mid() # middle ID
other_contact: Contact = Contact(id=contact_id, protocol=vp2)
other_node = Node(contact=other_contact, storage=store2)
vp2.node = other_node
# add this other contact to our peer list
dht._router.node.bucket_list.add_contact(other_contact)
# we want an integer distance, not an XOR distance.
key: ID = ID.min()
val = "Test"
other_node.simply_store(key, val)
self.assertFalse(store1.contains(key),
"Expected our peer to NOT have cached the key-value.")
self.assertTrue(store2.contains(key),
"Expected other node to HAVE cached the key-value.")
# Try and find the value, given our Dht knows about the other contact.
_, _, retval = dht.find_value(key)
self.assertTrue(retval == val,
"Expected to get back what we stored")
def test_value_stored_in_further_node(self):
vp1 = VirtualProtocol()
vp2 = VirtualProtocol()
store1 = VirtualStorage()
store2 = VirtualStorage()
# Ensures that all nodes are closer, because max id ^ n < max id when n > 0.
dht: DHT = DHT(id=ID.min(), protocol=vp1, router=ParallelRouter(), storage_factory=lambda: store1)
vp1.node = dht._router.node
contact_id = ID.max()
other_contact = Contact(contact_id, vp2)
other_node = Node(other_contact, store2)
vp2.node = other_node
# Add this other contact to our peer list.
dht._router.node.bucket_list.add_contact(other_contact)
key = ID(1)
val = "Test"
other_node.simply_store(key, val)
self.assertFalse(store1.contains(key),
"Expected our peer to have NOT cached the key-value.")
self.assertTrue(store2.contains(key),
"Expected other node to HAVE cached the key-value.")
retval: str = dht.find_value(key)[2]
self.assertTrue(retval == val,
"Expected to get back what we stored.")
def test_value_stored_gets_propagated(self):
vp1 = VirtualProtocol()
vp2 = VirtualProtocol()
store1 = VirtualStorage()
store2 = VirtualStorage()
dht: DHT = DHT(id=ID.min(), protocol=vp1, router=ParallelRouter(), storage_factory=lambda: store1)
vp1.node = dht._router.node
contact_id = ID.mid()
other_contact = Contact(contact_id, vp2)
other_node = Node(other_contact, store2)
vp2.node = other_node
dht._router.node.bucket_list.add_contact(other_contact)
key = ID(0)
val = "Test"
self.assertFalse(store1.contains(key),
"Obviously we don't have the key-value yet.")
self.assertFalse(store2.contains(key),
"And equally obvious, the other peer doesn't have the key-value yet either.")
dht.store(key, val)
self.assertTrue(store1.contains(key),
"Expected our peer to have stored the key-value.")
self.assertTrue(store2.contains(key),
"Expected the other peer to have stored the key-value.")
# def test_value_propagates_to_closer_node(self):
# vp1 = VirtualProtocol()
# vp2 = VirtualProtocol()
# vp3 = VirtualProtocol()
#
# store1 = VirtualStorage()
# store2 = VirtualStorage()
# store3 = VirtualStorage()
#
# cache3 = VirtualStorage()
#
# dht: DHT = DHT(id=ID.max(), protocol=vp1, router=ParallelRouter(), storage_factory=lambda: store1)
#
# vp1.node = dht._router.node
#
# # setup node 2
# contact_id_2 = ID.mid()
# other_contact_2 = Contact(contact_id_2, vp2)
# other_node_2 = Node(other_contact_2, store2)
# vp2.node = other_node_2
# # add the second contact to our peer list.
# dht._router.node.bucket_list.add_contact(other_contact_2)
# # node 2 has the value
# key = ID(0)
# val = "Test"
# other_node_2.storage.set(key, val)
#
# # setup node 3
# contact_id_3 = ID(2 ** 158) # I think this is the same as ID.Zero.SetBit(158)?
# other_contact_3 = Contact(contact_id_3, vp3)
# other_node_3 = Node(other_contact_3, store3, cache_storage=cache3)
# vp3.node = other_node_3
# # add the third contact to our peer list
# dht._router.node.bucket_list.add_contact(other_contact_3)
#
# self.assertFalse(store1.contains(key),
# "Obviously we don't have the key-value yet.")
#
# self.assertFalse(store3.contains(key),
# "And equally obvious, the third peer doesn't have the key-value yet either.")
#
# ret_found, ret_contacts, ret_val = dht.find_value(key)
#
# self.assertTrue(ret_found, "Expected value to be found.")
# self.assertFalse(store3.contains(key), "Key should not be in the republish store.")
# self.assertTrue(cache3.contains(key), "Key should be in the cache store.")
# self.assertTrue(cache3.get_expiration_time_sec(key.value) == Constants.EXPIRATION_TIME_SEC / 2,
# "Expected 12 hour expiration.")
class BootstrappingTests(unittest.TestCase):
def test_random_within_bucket_tests(self):
test_cases: list[tuple[int, int]] = [
(0, 256), # 7 bits should be set
(256, 1024), # 2 bits (256 + 512) should be set
(65536, 65536 * 2), # no additional bits should be set.
(65536, 65536 * 4), # 2 bits (65536 and 65536*2) should be set.
(65536, 65536 * 16) # 4 bits (65536, 65536*2, 65536*4, 65536*8) set.
]
for test_case in test_cases:
low = test_case[0]
high = test_case[1]
bucket: KBucket = KBucket(low=low, high=high)
id: ID = ID.random_id_within_bucket_range(bucket)
self.assertTrue(bucket.is_in_range(id))
def test_bootstrap_within_bootstrapping_bucket(self):
"""
Test the bootstrap process within a bootstrapping bucket scenario.
This test creates a network with 22 virtual protocols representing nodes.
It sets up a bootstrap peer, with the bootstrapper having knowledge of 10 other nodes,
and one of those nodes having knowledge of another 10 nodes. The goal is to simulate
the bootstrap process and ensure that the expected number of contacts is received.
We need 22 virtual protocols. One for the bootstrap peer,
10 for the nodes the bootstrap peer knows about, and 10 for
the nodes one of the nodes knows about. And one for us to
rule them all.
:return: None
"""
vp: list[VirtualProtocol] = []
# creates 22 virtual protocols
for i in range(22):
vp.append(VirtualProtocol())
# us
dht_us: DHT = DHT(ID.random_id(), vp[0], storage_factory=VirtualStorage, router=Router())
vp[0].node = dht_us._router.node
# our bootstrap peer
dht_bootstrap: DHT = DHT(ID.random_id(), vp[1], storage_factory=VirtualStorage, router=Router())
vp[1].node = dht_bootstrap._router.node
# stops pycharm saying it could be undefined later on. THIS LINE IS USELESS.
n: Node = Node(Contact(ID.random_id(), vp[0]), VirtualStorage())
# Our bootstrapper knows 10 contacts
for i in range(10):
c: Contact = Contact(ID.random_id(), vp[i + 2])
n: Node = Node(c, VirtualStorage())
vp[i + 2].node = n
dht_bootstrap._router.node.bucket_list.add_contact(c)
# One of those nodes, in this case the last one we added to our bootstrapper (for convenience's sake)
# knows about 10 other contacts
# n is the last one our bootstrapper knows
node_who_knows_10 = n # Ignore PyCharm error saying it can be referenced before being created.
del n # bad naming, don't want to use it later on.
# create the 10 it knows about
for i in range(10):
c: Contact = Contact(ID.random_id(), vp[i + 12])
n2: Node = Node(c, VirtualStorage())
vp[i + 12].node = n2
node_who_knows_10.bucket_list.add_contact(c)
dht_us.bootstrap(dht_bootstrap._router.node.our_contact)
sum_of_contacts = len(dht_us._router.node.bucket_list.contacts())
print(f"sum of contacts: {sum_of_contacts}")
self.assertTrue(sum_of_contacts == 11,
"Expected our peer to get 11 contacts.")
def test_bootstrap_outside_bootstrapping_bucket(self):
"""
Test the bootstrap process when bootstrapping from a DHT node with contacts outside its own bucket.
This test simulates the scenario where a DHT node (dht_us) attempts to bootstrap from another node
(dht_bootstrap) whose contact list includes nodes outside its immediate bucket range. The goal is to
verify that the bootstrapping process correctly adds and organizes contacts, and handles bucket splits.
Steps:
1. Create two DHT nodes, dht_us and dht_bootstrap, and establish virtual protocols (vp) for communication.
2. Populate dht_bootstrap with 20 contacts, with one contact having an ID >= 2 ** 159 to trigger a bucket split.
3. Add 10 contacts to one of the nodes in dht_bootstrap's contact list (simulating contacts outside the bucket).
4. Perform bootstrap operation from dht_us to dht_bootstrap.
5. Verify that dht_us has a total of 31 contacts after bootstrapping.
Raises:
AssertionError: If the number of contacts in dht_us after bootstrapping is not 31.
"""
vp: list[VirtualProtocol] = []
for i in range(32):
vp.append(VirtualProtocol())
# Us, ID doesn't matter.
dht_us: DHT = DHT(ID.random_id(), vp[0], storage_factory=VirtualStorage, router=Router())
vp[0].node = dht_us._router.node
# our bootstrap peer
# all IDs are < 2 ** 159
dht_bootstrap: DHT = DHT(ID.random_id(0, 2 ** 159 - 1), vp[1], storage_factory=VirtualStorage, router=Router())
vp[1].node = dht_bootstrap._router.node
# print(sum([len([c for c in b.contacts]) for b in dht_bootstrap._router.node.bucket_list.buckets]))
# Our bootstrapper knows 20 contacts
for i in range(19):
# creating 19 shell contacts
id: ID = ID.random_id(0, 2 ** 159 - 1)
c: Contact = Contact(id, vp[i + 2])
c.protocol.node = Node(c, VirtualStorage())
dht_bootstrap._router.node.bucket_list.add_contact(c)
# for 20th
# all IDs are < 2 ** 159, except the last one, which is >= 2 ** 159
# Which will force a bucket split for us
id = ID.max()
important_contact: Contact = Contact(id, vp[21])
n = Node(important_contact, VirtualStorage())
# add 10 contacts to node
# this basically means that the bootstrapper knows 20 contacts, one of them knows 10 contacts.
# we're trying to add all 30 + bootstrapper so 31.
for i in range(10):
# creating 10 shell contacts
c2: Contact = Contact(ID.random_id(), vp[i + 22])
n2 = Node(c2, VirtualStorage())
vp[i + 22].node = n2
# adding the 10 shell contacts
n.bucket_list.add_contact(c2) # Note we're adding these contacts to the 10th node.
important_contact.protocol.node = n
dht_bootstrap._router.node.bucket_list.add_contact(important_contact) # adds the 1 important contact.
# just making sure vp[i + 2].node = n works retrospectively on c.
self.assertTrue(n.our_contact.id == important_contact.protocol.node.our_contact.id == ID.max())
self.assertTrue(len(n.bucket_list.contacts()) == 10,
f"contacts: {len(n.bucket_list.contacts())}")
self.assertTrue(len(important_contact.protocol.node.bucket_list.contacts()) == 10,
f"contacts: {len(n.bucket_list.contacts())}")
self.assertTrue(important_contact.id == ID.max(), "What else could it be?")
self.assertTrue(ID.max() in [c.id for c in dht_bootstrap._router.node.bucket_list.contacts()],
"Contact we just added to bucket list should be in bucket list.")
# print("DHT Bootstrap contact length =", len(dht_bootstrap._router.node.bucket_list.contacts()))
self.assertTrue(len(dht_bootstrap._router.node.bucket_list.contacts()) == 20,
"DHT Bootstrapper must have 20 contacts.")
# One of those nodes, in this case specifically the last one we added to our bootstrapper so that it isn't in
# the bucket of our bootstrapper, we add 10 contacts. The IDs of those contacts don't matter.
self.assertTrue([len(b.contacts) for b in n.bucket_list.buckets] == [10],
"Must have 10 contacts in node.")
# print("Starting bootstrap...")
dht_us.bootstrap(dht_bootstrap._router.node.our_contact)
# print("Bootstrap finished!")
# print(f"\nLength of buckets: {[len(b.contacts) for b in dht_us._router.node.bucket_list.buckets]}")
sum_of_contacts = len(dht_us._router.node.bucket_list.contacts())
self.assertTrue(sum_of_contacts == 31,
f"Expected our peer to have 31 contacts, {sum_of_contacts} were given.")
class BucketManagementTests(unittest.TestCase):
def test_non_responding_contact_evicted(self):
"""
Tests that a nonresponding contact is evicted after
Constants.EVICTION_LIMIT tries.
"""
dht = DHT(ID(0), VirtualProtocol(), storage_factory=VirtualStorage, router=Router())
bucket_list: BucketList = setup_split_failure(dht.node.bucket_list)
self.assertTrue(len(bucket_list.buckets) == 2,
"Bucket split should have occurred.")
self.assertTrue(len(bucket_list.buckets[0].contacts) == 1,
"Expected 1 contact in bucket 0.")
self.assertTrue(len(bucket_list.buckets[1].contacts) == 20,
"Expected 20 contacts in bucket 1.")
# The bucket is now full. Pick the first contact, as it is last
# seen (they are added in chronological order.)
non_responding_contact: Contact = bucket_list.buckets[1].contacts[0]
# Since the protocols are shared, we need to assign
# a unique protocol for this node for testing.
vp_unresponding: VirtualProtocol = VirtualProtocol(
non_responding_contact.protocol.node,
False
)
non_responding_contact.protocol = vp_unresponding
next_new_contact = Contact(
ID(2 ** 159 + 1),
dht.our_contact.protocol
)
# Hit the nonresponding contact EVICTION_LIMIT times
# Which will trigger the eviction algorithm.
for _ in range(Constants.EVICTION_LIMIT):
bucket_list.add_contact(next_new_contact)
self.assertTrue(len(bucket_list.buckets[1].contacts) == 20,
"Expected 20 contacts in bucket 1.")
self.assertTrue(len(bucket_list.buckets[0].contacts) == 1,
f"Expected 1 contact in bucket 0, got {len(bucket_list.buckets[0].contacts)}.")
# Verify can_split -> pending eviction happened
self.assertTrue(len(dht.pending_contacts) == 0,
"Pending contact list should now be empty.")
self.assertFalse(non_responding_contact in bucket_list.contacts(),
"Expected bucket to NOT contain non-responding contact.")
self.assertTrue(next_new_contact in bucket_list.contacts(),
"Expected bucket to contain new contact.")
self.assertTrue(len(dht.eviction_count) == 0,
"Expected no contacts to be pending eviction.")
def test_non_responding_contact_delayed_eviction(self):
"""
Tests that a nonresponding contact puts the new contact into a pending list.
"""
dht = DHT(ID(0), VirtualProtocol(), storage_factory=VirtualStorage, router=Router())
bucket_list: BucketList = setup_split_failure(dht.node.bucket_list)
self.assertTrue(len(bucket_list.buckets) == 2,
"Bucket split should have occurred.")
self.assertTrue(len(bucket_list.buckets[0].contacts) == 1,
"Expected 1 contact in bucket 0.")
self.assertTrue(len(bucket_list.buckets[1].contacts) == 20,
"Expected 20 contacts in bucket 1.")
# The bucket is now full. pick the first contact,
# as it is last seen (they are added chronologically.)
non_responding_contact: Contact = bucket_list.buckets[1].contacts[0]
# Since the protocols are shared, we assign a unique protocol for this node for testing.
vp_unresponding = VirtualProtocol(
node=non_responding_contact.protocol.node,
responds=False
)
non_responding_contact.protocol = vp_unresponding
# set up the next new contact (it can respond.)
next_new_contact = Contact(
id=ID(2 ** 159 + 1),
protocol=dht.our_contact.protocol
)
bucket_list.add_contact(next_new_contact)
self.assertTrue(len(bucket_list.buckets[1].contacts) == 20,
f"Expecting 20 contacts in bucket 1, got {len(bucket_list.buckets[0].contacts)}")
self.assertTrue(len(bucket_list.buckets[0].contacts) == 1,
f"Expected 1 contact in bucket 0, got {len(bucket_list.buckets[0].contacts)}")
# Verify can_split -> Evict happened.
self.assertTrue(len(dht.pending_contacts) == 1,
"Expected one pending contact.")
self.assertTrue(next_new_contact in dht.pending_contacts,
"Expected pending contact to be the 21st contact.")
self.assertTrue(len(dht.eviction_count) == 1,
"Expected one contact to be pending eviction.")
class Chapter10Tests(unittest.TestCase):
def test_new_contact_gets_stored_contacts(self):
"""
Verify that we get stored values whose keys ^ contact ID are less than stored keys ^ other contacts.
There’s a lot of setup here for creating two existing contacts and two
key-values whose IDs have been specifically set. See the comments for
the XOR distance “math.”
"""
# Set up a node at the midpoint
# The existing node haas the ID 10000...
existing: Node = Node(Contact(ID.mid(), None), VirtualStorage())
val_1 = "Value 1"
val_mid = "Value mid"
# The existing node stores the 2 items, one with an ID "hash" of 1, the other with ID.max.
# Simple storage rather than executing the code for store.
existing.simply_store(ID(1), val_1)
existing.simply_store(ID.mid(), val_mid)
self.assertTrue(len(existing.storage.get_keys()) == 2,
f"Expected the existing node to have 2 key-values. {existing.storage.get_keys()}")
# Create a contact in the existing node's bucket list that is closer to one of the values.
# This contact has the prefix 0100000...
other_contact = Contact(ID(2 ** 158), None)
other = Node(other_contact, VirtualStorage())
existing.bucket_list.buckets[0].contacts.append(other_contact)
# The unseen contact has prefix 0110000...
unseen_vp = VirtualProtocol()
unseen_contact = Contact(ID(2 ** 158 + 2 ** 157), unseen_vp)
unseen = Node(unseen_contact, VirtualStorage())
unseen_vp.node = unseen # final fixup
self.assertTrue(len(unseen.storage.get_keys()) == 0, "The unseen node shouldn't have any key-values!")
# An unseen node pings, and we should get back val_min only as ID(1) ^ ID.mid() < ID.max() ^ ID.mid()
self.assertTrue(ID(1) ^ ID.mid() < ID.max() ^ ID.mid(), (f"Fundamental issue with ID class. "
f"\n{ID(ID(1) ^ ID.mid()).bin()} \nvs "
f"\n{ID(ID.max() ^ ID.mid()).bin()}"))
existing.ping(unseen_contact)
# Contacts V1 V2
# 1000000 00...0001 10...0000
# 0100000
# maths:
# c1 ^ v1 c1 ^ v2 c2 ^ v1 c2 ^ v2
# 100...001 000...000 010...001 110...000
# c1 ^ v1 > c1 ^ v2, so v1 doesn't get send to the unseen node.
# c1 ^ v2 < c2 ^ v2, so it does get sent.
self.assertTrue(
len(unseen.storage.get_keys()) == 1,
"Expected 1 value stored in our new node."
)
self.assertTrue(
unseen.storage.contains(ID.mid()),
"Expected val_mid to be stored."
)
self.assertTrue(
unseen.storage.get(ID.mid()) == val_mid,
f"Expected val_mid value to match, got {unseen.storage.get(ID.mid())}"
)
class DHTSerialisationTests(unittest.TestCase):
def test_serialisation(self):
dht: DHT = DHT(
id=ID.random_id(),
protocol=VirtualProtocol(),
router=Router(),