99// Redistribution and use in source and binary forms with or without
1010// modifications are permitted.
1111
12+ using Akka . Actor ;
1213using Akka . IO ;
1314using Akka . TestKit . MsTest ;
1415using Microsoft . VisualStudio . TestTools . UnitTesting ;
16+ using Neo ;
1517using Neo . Network . P2P ;
18+ using Neo . Network . P2P . Capabilities ;
19+ using Neo . Network . P2P . Payloads ;
20+ using Neo . UnitTests ;
1621using System ;
22+ using System . Collections . Concurrent ;
23+ using System . Collections . Generic ;
1724using System . Linq ;
1825using System . Net ;
26+ using System . Reflection ;
27+ using System . Runtime . Serialization ;
1928using System . Threading ;
2029
2130namespace Neo . UnitTests . Network . P2P
@@ -31,6 +40,36 @@ public void Init()
3140 _system = TestBlockchain . GetSystem ( ) ;
3241 }
3342
43+ [ TestMethod ]
44+ public void DisableCompressionCapabilityIsAdvertisedWhenCompressionDisabled ( )
45+ {
46+ var probe = CreateTestProbe ( ) ;
47+ var config = new ChannelsConfig { EnableCompression = false } ;
48+
49+ probe . Send ( _system . LocalNode , config ) ;
50+ probe . Send ( _system . LocalNode , new LocalNode . GetInstance ( ) ) ;
51+
52+ var localnode = probe . ExpectMsg < LocalNode > ( cancellationToken : CancellationToken . None ) ;
53+ var capabilities = localnode . GetNodeCapabilities ( ) ;
54+
55+ Assert . IsTrue ( capabilities . OfType < DisableCompressionCapability > ( ) . Any ( ) ) ;
56+ }
57+
58+ [ TestMethod ]
59+ public void DisableCompressionCapabilityIsOmittedWhenCompressionEnabled ( )
60+ {
61+ var probe = CreateTestProbe ( ) ;
62+ var config = new ChannelsConfig { EnableCompression = true } ;
63+
64+ probe . Send ( _system . LocalNode , config ) ;
65+ probe . Send ( _system . LocalNode , new LocalNode . GetInstance ( ) ) ;
66+
67+ var localnode = probe . ExpectMsg < LocalNode > ( cancellationToken : CancellationToken . None ) ;
68+ var capabilities = localnode . GetNodeCapabilities ( ) ;
69+
70+ Assert . IsFalse ( capabilities . OfType < DisableCompressionCapability > ( ) . Any ( ) ) ;
71+ }
72+
3473 [ TestMethod ]
3574 public void TestDefaults ( )
3675 {
@@ -64,5 +103,204 @@ public void ProcessesTcpConnectedAfterConfigArrives()
64103
65104 connectionProbe . ExpectMsg < Tcp . Register > ( TimeSpan . FromSeconds ( 1 ) , cancellationToken : CancellationToken . None ) ;
66105 }
106+
107+ [ TestMethod ]
108+ public void RespectsMaxConnectionsPerAddress ( )
109+ {
110+ var firstConnection = CreateTestProbe ( ) ;
111+ var secondConnection = CreateTestProbe ( ) ;
112+ var remote = new IPEndPoint ( IPAddress . Parse ( "203.0.113.5" ) , 20000 ) ;
113+ var local = new IPEndPoint ( IPAddress . Loopback , 20001 ) ;
114+
115+ var configProbe = CreateTestProbe ( ) ;
116+ configProbe . Send ( _system . LocalNode , new ChannelsConfig { MaxConnectionsPerAddress = 1 } ) ;
117+
118+ firstConnection . Send ( _system . LocalNode , new Tcp . Connected ( remote , local ) ) ;
119+ firstConnection . ExpectMsg < Tcp . Register > ( TimeSpan . FromSeconds ( 1 ) , cancellationToken : CancellationToken . None ) ;
120+
121+ secondConnection . Send ( _system . LocalNode , new Tcp . Connected ( remote , local ) ) ;
122+ secondConnection . ExpectMsg < Tcp . Abort > ( TimeSpan . FromSeconds ( 1 ) , cancellationToken : CancellationToken . None ) ;
123+ }
124+
125+ [ TestMethod ]
126+ public void DoesNotAddAlreadyConnectedPeerToUnconnected ( )
127+ {
128+ var configProbe = CreateTestProbe ( ) ;
129+ configProbe . Send ( _system . LocalNode , new ChannelsConfig ( ) ) ;
130+
131+ var remote = new IPEndPoint ( IPAddress . Parse ( "198.51.100.50" ) , 21001 ) ;
132+ var local = new IPEndPoint ( IPAddress . Loopback , 21002 ) ;
133+ var connectionProbe = CreateTestProbe ( ) ;
134+ connectionProbe . Send ( _system . LocalNode , new Tcp . Connected ( remote , local ) ) ;
135+ connectionProbe . ExpectMsg < Tcp . Register > ( TimeSpan . FromSeconds ( 1 ) , cancellationToken : CancellationToken . None ) ;
136+
137+ var another = new IPEndPoint ( IPAddress . Parse ( "203.0.113.10" ) , 21003 ) ;
138+ configProbe . Send ( _system . LocalNode , new Peer . Peers { EndPoints = new [ ] { remote , another } } ) ;
139+ configProbe . Send ( _system . LocalNode , new LocalNode . GetInstance ( ) ) ;
140+ var localnode = configProbe . ExpectMsg < LocalNode > ( TimeSpan . FromSeconds ( 1 ) , cancellationToken : CancellationToken . None ) ;
141+
142+ bool containsAnother = SpinWait . SpinUntil (
143+ ( ) => localnode . GetUnconnectedPeers ( ) . Contains ( another ) ,
144+ TimeSpan . FromMilliseconds ( 500 ) ) ;
145+
146+ var unconnected = localnode . GetUnconnectedPeers ( ) . ToArray ( ) ;
147+ CollectionAssert . DoesNotContain ( unconnected , remote ) ;
148+ Assert . IsTrue ( containsAnother , "Expected unconnected peers to include the new endpoint." ) ;
149+ }
150+
151+ [ TestMethod ]
152+ public void RelayDirectly_ForwardsNonBlockInventoryToAllRemotes ( )
153+ {
154+ var localNodeRef = ActorOfAsTestActorRef ( ( ) => new LocalNode ( _system ) ) ;
155+ localNodeRef . Tell ( new ChannelsConfig ( ) ) ;
156+
157+ var instanceProbe = CreateTestProbe ( ) ;
158+ instanceProbe . Send ( localNodeRef , new LocalNode . GetInstance ( ) ) ;
159+ var localNode = instanceProbe . ExpectMsg < LocalNode > ( cancellationToken : CancellationToken . None ) ;
160+
161+ var remoteOne = CreateTestProbe ( ) ;
162+ var remoteTwo = CreateTestProbe ( ) ;
163+ AddRemote ( localNode , remoteOne . Ref , 0 ) ;
164+ AddRemote ( localNode , remoteTwo . Ref , 0 ) ;
165+
166+ var tx = TestUtils . GetTransaction ( UInt160 . Zero ) ;
167+ localNodeRef . Tell ( new LocalNode . RelayDirectly { Inventory = tx } ) ;
168+
169+ remoteOne . ExpectMsg < RemoteNode . Relay > ( msg => ReferenceEquals ( tx , msg . Inventory ) , cancellationToken : CancellationToken . None ) ;
170+ remoteTwo . ExpectMsg < RemoteNode . Relay > ( msg => ReferenceEquals ( tx , msg . Inventory ) , cancellationToken : CancellationToken . None ) ;
171+
172+ localNode . RemoteNodes . Clear ( ) ;
173+ ClearConnections ( localNode ) ;
174+ Sys . Stop ( localNodeRef ) ;
175+ }
176+
177+ [ TestMethod ]
178+ public void RelayDirectly_SendsBlocksOnlyToLowerHeightRemotes ( )
179+ {
180+ var localNodeRef = ActorOfAsTestActorRef ( ( ) => new LocalNode ( _system ) ) ;
181+ localNodeRef . Tell ( new ChannelsConfig ( ) ) ;
182+
183+ var instanceProbe = CreateTestProbe ( ) ;
184+ instanceProbe . Send ( localNodeRef , new LocalNode . GetInstance ( ) ) ;
185+ var localNode = instanceProbe . ExpectMsg < LocalNode > ( cancellationToken : CancellationToken . None ) ;
186+
187+ var remoteLow = CreateTestProbe ( ) ;
188+ var remoteHigh = CreateTestProbe ( ) ;
189+ AddRemote ( localNode , remoteLow . Ref , 1 ) ;
190+ AddRemote ( localNode , remoteHigh . Ref , 5 ) ;
191+
192+ var block = new Block
193+ {
194+ Header = new Header
195+ {
196+ Index = 3 ,
197+ Version = 0 ,
198+ PrevHash = UInt256 . Zero ,
199+ MerkleRoot = UInt256 . Zero ,
200+ Timestamp = 0 ,
201+ NextConsensus = UInt160 . Zero ,
202+ Witness = Witness . Empty ,
203+ } ,
204+ Transactions = Array . Empty < Transaction > ( )
205+ } ;
206+
207+ localNodeRef . Tell ( new LocalNode . RelayDirectly { Inventory = block } ) ;
208+
209+ remoteLow . ExpectMsg < RemoteNode . Relay > ( msg => ReferenceEquals ( block , msg . Inventory ) , cancellationToken : CancellationToken . None ) ;
210+ #pragma warning disable MSTEST0049
211+ remoteHigh . ExpectNoMsg ( TimeSpan . FromMilliseconds ( 100 ) ) ;
212+ #pragma warning restore MSTEST0049
213+
214+ localNode . RemoteNodes . Clear ( ) ;
215+ ClearConnections ( localNode ) ;
216+ Sys . Stop ( localNodeRef ) ;
217+ }
218+
219+ [ TestMethod ]
220+ public void SendDirectly_PublishesInventoryWithoutWrapping ( )
221+ {
222+ var localNodeRef = ActorOfAsTestActorRef ( ( ) => new LocalNode ( _system ) ) ;
223+ localNodeRef . Tell ( new ChannelsConfig ( ) ) ;
224+
225+ var instanceProbe = CreateTestProbe ( ) ;
226+ instanceProbe . Send ( localNodeRef , new LocalNode . GetInstance ( ) ) ;
227+ var localNode = instanceProbe . ExpectMsg < LocalNode > ( cancellationToken : CancellationToken . None ) ;
228+
229+ var remote = CreateTestProbe ( ) ;
230+ AddRemote ( localNode , remote . Ref , 0 ) ;
231+
232+ var tx = TestUtils . GetTransaction ( UInt160 . Zero ) ;
233+ localNodeRef . Tell ( new LocalNode . SendDirectly { Inventory = tx } ) ;
234+
235+ remote . ExpectMsg < Transaction > ( t => ReferenceEquals ( tx , t ) , cancellationToken : CancellationToken . None ) ;
236+
237+ localNode . RemoteNodes . Clear ( ) ;
238+ ClearConnections ( localNode ) ;
239+ Sys . Stop ( localNodeRef ) ;
240+ }
241+
242+ [ TestMethod ]
243+ public void NeedMorePeers_BroadcastsGetAddrWhenConnected ( )
244+ {
245+ var localNodeRef = ActorOfAsTestActorRef ( ( ) => new LocalNode ( _system ) ) ;
246+ localNodeRef . Tell ( new ChannelsConfig ( ) ) ;
247+
248+ var instanceProbe = CreateTestProbe ( ) ;
249+ instanceProbe . Send ( localNodeRef , new LocalNode . GetInstance ( ) ) ;
250+ var localNode = instanceProbe . ExpectMsg < LocalNode > ( cancellationToken : CancellationToken . None ) ;
251+
252+ var remoteProbe = CreateTestProbe ( ) ;
253+ AddRemote ( localNode , remoteProbe . Ref , 0 ) ;
254+ AddConnectedPeer ( localNode , remoteProbe . Ref , new IPEndPoint ( IPAddress . Parse ( "198.51.100.1" ) , 3000 ) ) ;
255+
256+ TriggerTimer ( localNodeRef ) ;
257+
258+ remoteProbe . ExpectMsg < Message > ( msg => msg . Command == MessageCommand . GetAddr , cancellationToken : CancellationToken . None ) ;
259+
260+ localNode . RemoteNodes . Clear ( ) ;
261+ ClearConnections ( localNode ) ;
262+ Sys . Stop ( localNodeRef ) ;
263+ }
264+
265+ private static void AddRemote ( LocalNode localNode , IActorRef actor , uint lastBlockIndex )
266+ {
267+ #pragma warning disable SYSLIB0050
268+ var remote = ( RemoteNode ) FormatterServices . GetUninitializedObject ( typeof ( RemoteNode ) ) ;
269+ #pragma warning restore SYSLIB0050
270+ var field = typeof ( RemoteNode ) . GetField ( "<LastBlockIndex>k__BackingField" , BindingFlags . Instance | BindingFlags . NonPublic ) ;
271+ field ? . SetValue ( remote , lastBlockIndex ) ;
272+ localNode . RemoteNodes . TryAdd ( actor , remote ) ;
273+ }
274+
275+ private static void AddConnectedPeer ( LocalNode localNode , IActorRef actor , IPEndPoint endpoint )
276+ {
277+ var connectedPeersField = typeof ( Peer ) . GetField ( "ConnectedPeers" , BindingFlags . Instance | BindingFlags . NonPublic ) ;
278+ var connectedPeers = ( ConcurrentDictionary < IActorRef , IPEndPoint > ) connectedPeersField ! . GetValue ( localNode ) ;
279+ connectedPeers . TryAdd ( actor , endpoint ) ;
280+
281+ var connectedAddressesField = typeof ( Peer ) . GetField ( "ConnectedAddresses" , BindingFlags . Instance | BindingFlags . NonPublic ) ;
282+ var connectedAddresses = ( Dictionary < IPAddress , int > ) connectedAddressesField ! . GetValue ( localNode ) ;
283+ connectedAddresses . TryGetValue ( endpoint . Address , out var count ) ;
284+ connectedAddresses [ endpoint . Address ] = count + 1 ;
285+ }
286+
287+ private static void ClearConnections ( LocalNode localNode )
288+ {
289+ var connectedPeersField = typeof ( Peer ) . GetField ( "ConnectedPeers" , BindingFlags . Instance | BindingFlags . NonPublic ) ;
290+ var connectedPeers = ( ConcurrentDictionary < IActorRef , IPEndPoint > ) connectedPeersField ! . GetValue ( localNode ) ;
291+ connectedPeers . Clear ( ) ;
292+
293+ var connectedAddressesField = typeof ( Peer ) . GetField ( "ConnectedAddresses" , BindingFlags . Instance | BindingFlags . NonPublic ) ;
294+ var connectedAddresses = ( Dictionary < IPAddress , int > ) connectedAddressesField ! . GetValue ( localNode ) ;
295+ connectedAddresses . Clear ( ) ;
296+ }
297+
298+ private static void TriggerTimer ( IActorRef peer )
299+ {
300+ var timerType = typeof ( Peer ) . GetNestedType ( "Timer" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
301+ Assert . IsNotNull ( timerType , "Peer.Timer type not found via reflection." ) ;
302+ var timer = Activator . CreateInstance ( timerType ! ) ;
303+ peer . Tell ( timer ! ) ;
304+ }
67305 }
68306}
0 commit comments