Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rate limit peer exchange protocol, enhanced response status in RPC #3035

Merged
merged 9 commits into from
Sep 18, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, str
proc performPeerExchangeRequestTo(
numPeers: uint64, waku: ptr Waku
): Future[Result[int, string]] {.async.} =
return await waku.node.fetchPeerExchangePeers(numPeers)
return (await waku.node.fetchPeerExchangePeers(numPeers)).isOkOr:
return err($error)

proc process*(
self: ptr DiscoveryRequest, waku: ptr Waku
Expand Down
1 change: 1 addition & 0 deletions tests/common/test_all.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ import
./test_parse_size,
./test_tokenbucket,
./test_requestratelimiter,
./test_ratelimit_setting,
./test_timed_map
165 changes: 165 additions & 0 deletions tests/common/test_ratelimit_setting.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)

{.used.}

import testutils/unittests
import chronos, libp2p/stream/connection
import std/[sequtils, options, tables]

import ../../waku/common/rate_limit/request_limiter
import ../../waku/common/rate_limit/timed_map

let proto = "ProtocolDescriptor"

let conn1 = Connection(peerId: PeerId.random().tryGet())
let conn2 = Connection(peerId: PeerId.random().tryGet())
let conn3 = Connection(peerId: PeerId.random().tryGet())

suite "RateLimitSetting":
test "Parse rate limit setting - ok":
let test1 = "10/2m"
let test2 = " store : 10 /1h"
let test2a = "storev2 : 10 /1h"
let test2b = "storeV3: 12 /1s"
let test3 = "LIGHTPUSH: 10/ 1m"
let test4 = "px:10/2 s "
let test5 = "filter:42/66ms"

let expU = UnlimitedRateLimit
let exp1: RateLimitSetting = (10, 2.minutes)
let exp2: RateLimitSetting = (10, 1.hours)
let exp2a: RateLimitSetting = (10, 1.hours)
let exp2b: RateLimitSetting = (12, 1.seconds)
let exp3: RateLimitSetting = (10, 1.minutes)
let exp4: RateLimitSetting = (10, 2.seconds)
let exp5: RateLimitSetting = (42, 66.milliseconds)

let res1 = ProtocolRateLimitSettings.parse(@[test1])
let res2 = ProtocolRateLimitSettings.parse(@[test2])
let res2a = ProtocolRateLimitSettings.parse(@[test2a])
let res2b = ProtocolRateLimitSettings.parse(@[test2b])
let res3 = ProtocolRateLimitSettings.parse(@[test3])
let res4 = ProtocolRateLimitSettings.parse(@[test4])
let res5 = ProtocolRateLimitSettings.parse(@[test5])

check:
res1.isOk()
res1.get() == {GLOBAL: exp1, FILTER: FilterDefaultPerPeerRateLimit}.toTable()
res2.isOk()
res2.get() ==
{
GLOBAL: expU,
FILTER: FilterDefaultPerPeerRateLimit,
STOREV2: exp2,
STOREV3: exp2,
}.toTable()
res2a.isOk()
res2a.get() ==
{GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, STOREV2: exp2a}.toTable()
res2b.isOk()
res2b.get() ==
{GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, STOREV3: exp2b}.toTable()
res3.isOk()
res3.get() ==
{GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, LIGHTPUSH: exp3}.toTable()
res4.isOk()
res4.get() ==
{GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, PEEREXCHG: exp4}.toTable()
res5.isOk()
res5.get() == {GLOBAL: expU, FILTER: exp5}.toTable()

test "Parse rate limit setting - err":
let test1 = "10/2d"
let test2 = " stre : 10 /1h"
let test2a = "storev2 10 /1h"
let test2b = "storev3: 12 1s"
let test3 = "somethingelse: 10/ 1m"
let test4 = ":px:10/2 s "
let test5 = "filter:p42/66ms"

let res1 = ProtocolRateLimitSettings.parse(@[test1])
let res2 = ProtocolRateLimitSettings.parse(@[test2])
let res2a = ProtocolRateLimitSettings.parse(@[test2a])
let res2b = ProtocolRateLimitSettings.parse(@[test2b])
let res3 = ProtocolRateLimitSettings.parse(@[test3])
let res4 = ProtocolRateLimitSettings.parse(@[test4])
let res5 = ProtocolRateLimitSettings.parse(@[test5])

check:
res1.isErr()
res2.isErr()
res2a.isErr()
res2b.isErr()
res3.isErr()
res4.isErr()
res5.isErr()

test "Parse rate limit setting - complex":
let expU = UnlimitedRateLimit

let test1 = @["lightpush:2/2ms", "10/2m", " store: 3/3s", " storev2:12/12s"]
let exp1 = {
GLOBAL: (10, 2.minutes),
FILTER: FilterDefaultPerPeerRateLimit,
LIGHTPUSH: (2, 2.milliseconds),
STOREV3: (3, 3.seconds),
STOREV2: (12, 12.seconds),
}.toTable()

let res1 = ProtocolRateLimitSettings.parse(test1)

check:
res1.isOk()
res1.get() == exp1
res1.get().getSetting(PEEREXCHG) == (10, 2.minutes)
res1.get().getSetting(STOREV2) == (12, 12.seconds)
res1.get().getSetting(STOREV3) == (3, 3.seconds)
res1.get().getSetting(LIGHTPUSH) == (2, 2.milliseconds)

let test2 = @["lightpush:2/2ms", " store: 3/3s", "px:10/10h", "filter:4/42ms"]
let exp2 = {
GLOBAL: expU,
LIGHTPUSH: (2, 2.milliseconds),
STOREV3: (3, 3.seconds),
STOREV2: (3, 3.seconds),
FILTER: (4, 42.milliseconds),
PEEREXCHG: (10, 10.hours),
}.toTable()

let res2 = ProtocolRateLimitSettings.parse(test2)

check:
res2.isOk()
res2.get() == exp2

let test3 =
@["storev2:1/1s", "store:3/3s", "storev3:4/42ms", "storev3:5/5s", "storev3:6/6s"]
let exp3 = {
GLOBAL: expU,
FILTER: FilterDefaultPerPeerRateLimit,
STOREV3: (6, 6.seconds),
STOREV2: (1, 1.seconds),
}.toTable()

let res3 = ProtocolRateLimitSettings.parse(test3)

check:
res3.isOk()
res3.get() == exp3
res3.get().getSetting(LIGHTPUSH) == expU

let test4 = newSeq[string](0)
let exp4 = {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit}.toTable()

let res4 = ProtocolRateLimitSettings.parse(test4)

check:
res4.isOk()
res4.get() == exp4
res3.get().getSetting(LIGHTPUSH) == expU
14 changes: 14 additions & 0 deletions tests/common/test_requestratelimiter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ suite "RequestRateLimiter":
# requests of other peers can also go
check limiter.checkUsage(proto, conn2, now + 4100.milliseconds) == true
check limiter.checkUsage(proto, conn3, now + 5.minutes) == true

test "RequestRateLimiter lowest possible volume":
# keep limits low for easier calculation of ratios
let rateLimit: RateLimitSetting = (1, 1.seconds)
var limiter = newRequestRateLimiter(some(rateLimit))

let now = Moment.now()
# with first use we register the peer also and start its timer
check limiter.checkUsage(proto, conn1, now + 500.milliseconds) == true

# run out of main tokens but still used one more token from the peer's bucket
check limiter.checkUsage(proto, conn1, now + 800.milliseconds) == false
check limiter.checkUsage(proto, conn1, now + 1499.milliseconds) == false
check limiter.checkUsage(proto, conn1, now + 1501.milliseconds) == true
7 changes: 5 additions & 2 deletions tests/node/test_wakunode_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,18 @@ suite "Waku Peer Exchange":
# Then no peers are fetched
check:
node.peerManager.peerStore.peers.len == 0
res.error == "PeerExchange is not mounted"
res.error.status_code == SERVICE_UNAVAILABLE
res.error.status_desc == some("PeerExchange is not mounted")

asyncTest "Node fetches with mounted peer exchange, but no peers":
# Given a node with peer exchange mounted
await node.mountPeerExchange()

# When a node fetches peers
let res = await node.fetchPeerExchangePeers(1)
check res.error == "Peer exchange failure: peer_not_found_failure"
check:
res.error.status_code == SERVICE_UNAVAILABLE
res.error.status_desc == some("peer_not_found_failure")

# Then no peers are fetched
check node.peerManager.peerStore.peers.len == 0
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_filter_v2/test_waku_filter_dos_protection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ suite "Waku Filter - DOS protection":
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)

# ensure period of time has passed and clients can again use the service
await sleepAsync(700.milliseconds)
await sleepAsync(1000.milliseconds)
NagyZoltanPeter marked this conversation as resolved.
Show resolved Hide resolved
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
Expand Down
67 changes: 62 additions & 5 deletions tests/waku_peer_exchange/test_protocol.nim
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
{.used.}

import
std/[options, sequtils, tables],
std/[options, sequtils, tables, net],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/[switch, peerId, crypto/crypto, multistream, muxers/muxer],
eth/[keys, p2p/discoveryv5/enr]

Expand Down Expand Up @@ -223,6 +222,7 @@ suite "Waku Peer Exchange":
# Check that it failed gracefully
check:
response.isErr
response.error.status_code == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE

asyncTest "Request 0 peers, with 0 peers in PeerExchange":
# Given a disconnected PeerExchange
Expand All @@ -237,7 +237,7 @@ suite "Waku Peer Exchange":
# Then the response should be an error
check:
response.isErr
response.error == "peer_not_found_failure"
response.error.status_code == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE

asyncTest "Pool filtering":
let
Expand Down Expand Up @@ -331,7 +331,7 @@ suite "Waku Peer Exchange":
# Then the response should be an error
check:
response.isErr
response.error == "dial_failure"
response.error.status_code == PeerExchangeResponseStatusCode.DIAL_FAILURE

asyncTest "Connections are closed after response is sent":
# Create 3 nodes
Expand Down Expand Up @@ -385,7 +385,7 @@ suite "Waku Peer Exchange":
let conn = connOpt.get()

# Send bytes so that they directly hit the handler
let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: 1))
let rpc = PeerExchangeRpc.makeRequest(1)

var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
Expand All @@ -397,5 +397,62 @@ suite "Waku Peer Exchange":

# Check we got back the enr we mocked
check:
decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
decodedBuff.get().response.peerInfos.len == 1
decodedBuff.get().response.peerInfos[0].enr == enr1.raw

asyncTest "RateLimit as expected":
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures(
[
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
node2.mountPeerExchange(),
]
)

# Create connection
let connOpt = await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
require:
connOpt.isSome

# Create some enr and add to peer exchange (simulating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
check enr2.fromUri(
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
)

# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)

await sleepAsync(150.milliseconds)

# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(1)
check:
response1.isOk
response1.get().peerInfos.len == 1

let response2 =
await node2.wakuPeerExchange.request(1, node1.peerInfo.toRemotePeerInfo())
check:
response2.isErr
response2.error().status_code == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS

await sleepAsync(150.milliseconds)
let response3 = await node2.wakuPeerExchange.request(1, connOpt.get())
check:
response3.isOk
response3.get().peerInfos.len == 1
Loading
Loading