Skip to content

Commit

Permalink
Refactored RateLimit configuration option for better CLI UX - now pos…
Browse files Browse the repository at this point in the history
…sible to set separate limits per protocol. Adjusted mountings. Added and adjusted tests
  • Loading branch information
NagyZoltanPeter committed Sep 13, 2024
1 parent a9324fa commit 210332a
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 45 deletions.
1 change: 1 addition & 0 deletions tests/common/test_all.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ import
./test_parse_size,
./test_tokenbucket,
./test_requestratelimiter,
./test_ratelimit_setting,
./test_timed_map
150 changes: 150 additions & 0 deletions tests/common/test_ratelimit_setting.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)

{.used.}

import testutils/unittests
import chronos, libp2p/stream/connection
import std/[sequtils, options, tables]

import ../../waku/common/rate_limit/request_limiter
import ../../waku/common/rate_limit/timed_map

let proto = "ProtocolDescriptor"

let conn1 = Connection(peerId: PeerId.random().tryGet())
let conn2 = Connection(peerId: PeerId.random().tryGet())
let conn3 = Connection(peerId: PeerId.random().tryGet())

suite "RateLimitSetting":
test "Parse rate limit setting - ok":
let test1 = "10/2m"
let test2 = " store : 10 /1h"
let test2a = "storev2 : 10 /1h"
let test2b = "storeV3: 12 /1s"
let test3 = "LIGHTPUSH: 10/ 1m"
let test4 = "px:10/2 s "
let test5 = "filter:42/66ms"

let expU = UnlimitedRateLimit
let exp1: RateLimitSetting = (10, 2.minutes)
let exp2: RateLimitSetting = (10, 1.hours)
let exp2a: RateLimitSetting = (10, 1.hours)
let exp2b: RateLimitSetting = (12, 1.seconds)
let exp3: RateLimitSetting = (10, 1.minutes)
let exp4: RateLimitSetting = (10, 2.seconds)
let exp5: RateLimitSetting = (42, 66.milliseconds)

let res1 = ProtocolRateLimitSettings.parse(@[test1])
let res2 = ProtocolRateLimitSettings.parse(@[test2])
let res2a = ProtocolRateLimitSettings.parse(@[test2a])
let res2b = ProtocolRateLimitSettings.parse(@[test2b])
let res3 = ProtocolRateLimitSettings.parse(@[test3])
let res4 = ProtocolRateLimitSettings.parse(@[test4])
let res5 = ProtocolRateLimitSettings.parse(@[test5])

check:
res1.isOk()
res1.get() == {GLOBAL: exp1}.toTable()
res2.isOk()
res2.get() == {GLOBAL: expU, STOREV2: exp2, STOREV3: exp2}.toTable()
res2a.isOk()
res2a.get() == {GLOBAL: expU, STOREV2: exp2a}.toTable()
res2b.isOk()
res2b.get() == {GLOBAL: expU, STOREV3: exp2b}.toTable()
res3.isOk()
res3.get() == {GLOBAL: expU, LIGHTPUSH: exp3}.toTable()
res4.isOk()
res4.get() == {GLOBAL: expU, PEEREXCHG: exp4}.toTable()
res5.isOk()
res5.get() == {GLOBAL: expU, FILTER: exp5}.toTable()

test "Parse rate limit setting - err":
let test1 = "10/2d"
let test2 = " stre : 10 /1h"
let test2a = "storev2 10 /1h"
let test2b = "storev3: 12 1s"
let test3 = "somethingelse: 10/ 1m"
let test4 = ":px:10/2 s "
let test5 = "filter:p42/66ms"

let res1 = ProtocolRateLimitSettings.parse(@[test1])
let res2 = ProtocolRateLimitSettings.parse(@[test2])
let res2a = ProtocolRateLimitSettings.parse(@[test2a])
let res2b = ProtocolRateLimitSettings.parse(@[test2b])
let res3 = ProtocolRateLimitSettings.parse(@[test3])
let res4 = ProtocolRateLimitSettings.parse(@[test4])
let res5 = ProtocolRateLimitSettings.parse(@[test5])

check:
res1.isErr()
res2.isErr()
res2a.isErr()
res2b.isErr()
res3.isErr()
res4.isErr()
res5.isErr()

test "Parse rate limit setting - complex":
let expU = UnlimitedRateLimit

let test1 = @["lightpush:2/2ms", "10/2m", " store: 3/3s", " storev2:12/12s"]
let exp1 = {
GLOBAL: (10, 2.minutes),
LIGHTPUSH: (2, 2.milliseconds),
STOREV3: (3, 3.seconds),
STOREV2: (12, 12.seconds),
}.toTable()

let res1 = ProtocolRateLimitSettings.parse(test1)

check:
res1.isOk()
res1.get() == exp1
res1.get().getSetting(PEEREXCHG) == (10, 2.minutes)
res1.get().getSetting(STOREV2) == (12, 12.seconds)
res1.get().getSetting(STOREV3) == (3, 3.seconds)
res1.get().getSetting(LIGHTPUSH) == (2, 2.milliseconds)

let test2 = @["lightpush:2/2ms", " store: 3/3s", "px:10/10h", "filter:4/42ms"]
let exp2 = {
GLOBAL: expU,
LIGHTPUSH: (2, 2.milliseconds),
STOREV3: (3, 3.seconds),
STOREV2: (3, 3.seconds),
FILTER: (4, 42.milliseconds),
PEEREXCHG: (10, 10.hours),
}.toTable()

let res2 = ProtocolRateLimitSettings.parse(test2)

check:
res2.isOk()
res2.get() == exp2

let test3 =
@["storev2:1/1s", "store:3/3s", "storev3:4/42ms", "storev3:5/5s", "storev3:6/6s"]
let exp3 =
{GLOBAL: expU, STOREV3: (6, 6.seconds), STOREV2: (1, 1.seconds)}.toTable()

let res3 = ProtocolRateLimitSettings.parse(test3)

check:
res3.isOk()
res3.get() == exp3
res3.get().getSetting(LIGHTPUSH) == expU

let test4 = newSeq[string](0)
let exp4 = {GLOBAL: expU}.toTable()

let res4 = ProtocolRateLimitSettings.parse(test4)

check:
res4.isOk()
res4.get() == exp4
res3.get().getSetting(LIGHTPUSH) == expU
14 changes: 14 additions & 0 deletions tests/common/test_requestratelimiter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ suite "RequestRateLimiter":
# requests of other peers can also go
check limiter.checkUsage(proto, conn2, now + 4100.milliseconds) == true
check limiter.checkUsage(proto, conn3, now + 5.minutes) == true

test "RequestRateLimiter lowest possible volume":
# keep limits low for easier calculation of ratios
let rateLimit: RateLimitSetting = (1, 1.seconds)
var limiter = newRequestRateLimiter(some(rateLimit))

let now = Moment.now()
# with first use we register the peer also and start its timer
check limiter.checkUsage(proto, conn1, now + 500.milliseconds) == true

# run out of main tokens but still used one more token from the peer's bucket
check limiter.checkUsage(proto, conn1, now + 800.milliseconds) == false
check limiter.checkUsage(proto, conn1, now + 1499.milliseconds) == false
check limiter.checkUsage(proto, conn1, now + 1501.milliseconds) == true
7 changes: 5 additions & 2 deletions tests/node/test_wakunode_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,18 @@ suite "Waku Peer Exchange":
# Then no peers are fetched
check:
node.peerManager.peerStore.peers.len == 0
res.error == "PeerExchange is not mounted"
res.error.status == SERVICE_UNAVAILABLE
res.error.desc == some("PeerExchange is not mounted")

asyncTest "Node fetches with mounted peer exchange, but no peers":
# Given a node with peer exchange mounted
await node.mountPeerExchange()

# When a node fetches peers
let res = await node.fetchPeerExchangePeers(1)
check res.error == "Peer exchange failure: peer_not_found_failure"
check:
res.error.status == SERVICE_UNAVAILABLE
res.error.desc == some("peer_not_found_failure")

# Then no peers are fetched
check node.peerManager.peerStore.peers.len == 0
Expand Down
56 changes: 56 additions & 0 deletions tests/waku_peer_exchange/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,59 @@ suite "Waku Peer Exchange":
decodedBuff.get().response.isSome()
decodedBuff.get().response.get().peerInfos.len == 1
decodedBuff.get().response.get().peerInfos[0].enr == enr1.raw

asyncTest "RateLimit as expected":
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures(
[
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
node2.mountPeerExchange(),
]
)

# Create connection
let connOpt = await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
require:
connOpt.isSome

# Create some enr and add to peer exchange (simulating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
check enr2.fromUri(
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
)

# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)

await sleepAsync(150.milliseconds)

# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(1)
check:
response1.isOk
response1.get().peerInfos.len == 1

let response2 =
await node2.wakuPeerExchange.request(1, node1.peerInfo.toRemotePeerInfo())
check:
response2.isErr
response2.error().status == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS

await sleepAsync(150.milliseconds)
let response3 = await node2.wakuPeerExchange.request(1, connOpt.get())
check:
response3.isOk
response3.get().peerInfos.len == 1
118 changes: 117 additions & 1 deletion waku/common/rate_limit/setting.nim
Original file line number Diff line number Diff line change
@@ -1,12 +1,35 @@
{.push raises: [].}

import chronos/timer
import chronos/timer, std/[tables, strutils, options], regex, results

# Setting for TokenBucket defined as volume over period of time
type RateLimitSetting* = tuple[volume: int, period: Duration]

type RateLimitedProtocol* = enum
GLOBAL
STOREV2
STOREV3
LIGHTPUSH
PEEREXCHG
FILTER

type ProtocolRateLimitSettings* = Table[RateLimitedProtocol, RateLimitSetting]
type ProtocolRateLimit = tuple[protocol: RateLimitedProtocol, setting: RateLimitSetting]

# Set the default to switch off rate limiting for now
let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes)
let UnlimitedRateLimit*: RateLimitSetting = (0, 0.seconds)

# Acceptable call frequence from one peer using filter service
# Assumption is having to set up a subscription with max 30 calls than using ping in every min
# While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from
# all subscribed peers
let FilterDefaultPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes)

# For being used under GC-safe condition must use threadvar
var DefaultProtocolRateLimit* {.threadvar.}: ProtocolRateLimitSettings
DefaultProtocolRateLimit =
{GLOBAL: UnlimitedRateLimit, FILTER: FilterDefaultPerPeerRateLimit}.toTable()

proc isUnlimited*(t: RateLimitSetting): bool {.inline.} =
return t.volume <= 0 or t.period <= 0.seconds
Expand All @@ -17,3 +40,96 @@ func `$`*(t: RateLimitSetting): string {.inline.} =
"no-limit"
else:
$t.volume & "/" & $t.period

proc translate(sProtocol: string): RateLimitedProtocol =
if sProtocol.len == 0:
return GLOBAL

case sProtocol
of "global":
return GLOBAL
of "storev2":
return STOREV2
of "storev3":
return STOREV3
of "lightpush":
return LIGHTPUSH
of "px":
return PEEREXCHG
of "filter":
return FILTER

proc fillSettingTable(
t: var ProtocolRateLimitSettings, sProtocol: var string, setting: RateLimitSetting
) =
let protocol = translate(sProtocol)

if sProtocol == "store":
# generic store will only applies to version which is not listed directly
discard t.hasKeyOrPut(STOREV2, setting)
discard t.hasKeyOrPut(STOREV3, setting)
else:
# always overrides, last one wins if same protocol duplicated
t[protocol] = setting

proc parse*(
T: type ProtocolRateLimitSettings, settings: seq[string]
): Result[ProtocolRateLimitSettings, string] =
var settingsTable: ProtocolRateLimitSettings =
initTable[RateLimitedProtocol, RateLimitSetting]()

## Following regex can match the exact syntax of how rate limit can be set for different protocol or global.
## It uses capture groups
## group0: Will be check if protocol name is followed by a colon but only if protocol name is set.
## group1: Protocol name, if empty we take it as "global" setting
## group2: Volume of tokens - only integer
## group3: Duration of period - only integer
## group4: Unit of period - only h:hour, m:minute, s:second, ms:millisecond allowed
## whitespaces are allowed lazily
const parseRegex =
"""^\s*((store|storev2|storev3|lightpush|px|filter)\s*:)?\s*(\d+)\s*\/\s*(\d+)\s*(s|h|m|ms)\s*$"""
const regexParseSize = re2(parseRegex)
for settingStr in settings:
let aSetting = settingStr.toLower()
try:
var m: RegexMatch2
if aSetting.match(regexParseSize, m) == false:
return err("Invalid rate-limit setting: " & settingStr)

var sProtocol = aSetting[m.captures[1]]
let volume = aSetting[m.captures[2]].parseInt()
let duration = aSetting[m.captures[3]].parseInt()
let periodUnit = aSetting[m.captures[4]]

var period = 0.seconds
case periodUnit
of "ms":
period = duration.milliseconds
of "s":
period = duration.seconds
of "m":
period = duration.minutes
of "h":
period = duration.hours

fillSettingTable(settingsTable, sProtocol, (volume, period))
except ValueError:
return err("Invalid rate-limit setting: " & settingStr)

# If there were no global setting predefined, we set unlimited
# due it is taken for protocols not defined in the list - thus those will not apply accidentally wrong settings.
discard settingsTable.hasKeyOrPut(GLOBAL, UnlimitedRateLimit)
discard settingsTable.hasKeyOrPut(FILTER, FilterDefaultPerPeerRateLimit)

return ok(settingsTable)

proc parse*(
T: type ProtocolRateLimitSettings, settings: string
): Result[ProtocolRateLimitSettings, string] =
return ok(settingsTable)

proc getSetting*(
t: ProtocolRateLimitSettings, protocol: RateLimitedProtocol
): RateLimitSetting =
let default = t.getOrDefault(GLOBAL, UnlimitedRateLimit)
return t.getOrDefault(protocol, default)
Loading

0 comments on commit 210332a

Please sign in to comment.