Skip to content

Commit

Permalink
feat: Added simple, configurable rate limit for lightpush and store-q…
Browse files Browse the repository at this point in the history
…uery

Adjust lightpush rest response to rate limit, added tests ann some fixes

Add rest store query test for rate limit checks and proper error response

Update apps/wakunode2/external_config.nim

Co-authored-by: gabrielmer <[email protected]>

Fixing typos and PR comments

Fix tests

Move chronos/tokenbucket to nwaku codebasee with limited and fixed feature set

Add meterics to lightpush rate limits
  • Loading branch information
NagyZoltanPeter committed Mar 25, 2024
1 parent 7aea2d4 commit 67339fd
Show file tree
Hide file tree
Showing 21 changed files with 667 additions and 23 deletions.
6 changes: 4 additions & 2 deletions tests/waku_lightpush/lightpush_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import
../testlib/[common, wakucore]

proc newTestWakuLightpushNode*(
switch: Switch, handler: PushMessageHandler
switch: Switch,
handler: PushMessageHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuLightPush.new(peerManager, rng, handler)
proto = WakuLightPush.new(peerManager, rng, handler, rateLimitSetting)

await proto.start()
switch.mount(proto)
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_lightpush/test_all.nim
Original file line number Diff line number Diff line change
@@ -1 +1 @@
import ./test_client
import ./test_client, ./test_ratelimit
2 changes: 1 addition & 1 deletion tests/waku_lightpush/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ suite "Waku Lightpush Client":
# 1KiB
message2 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(10 * 1024)
) # 10KiB
) # 10KiB
message3 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
) # 100KiB
Expand Down
145 changes: 145 additions & 0 deletions tests/waku_lightpush/test_ratelimit.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
{.used.}

import
std/[options, strscans],
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto

import
../../../waku/[
node/peer_manager,
waku_core,
waku_lightpush,
waku_lightpush/client,
waku_lightpush/common,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_lightpush/rpc_codec,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
../resources/[pubsub_topics, content_topics, payloads]

suite "Rate limited push service":
asyncTest "push message with rate limit not violated":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()

await allFutures(serverSwitch.start(), clientSwitch.start())

## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
tokenPeriod = 500.millis
server =
await newTestWakuLightpushNode(serverSwitch, handler, some((3, tokenPeriod)))
client = newTestWakuLightpushClient(clientSwitch)

let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let topic = DefaultPubsubTopic

let sendMsgProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()

handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes = await client.publish(topic, message, peer = serverPeerId)

require await handlerFuture.withTimeout(50.millis)

check:
requestRes.isOk()
handlerFuture.finished()

let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == topic
handledMessage == message

let waitInBetweenFor = 20.millis

for runCnt in 0 ..< 3:
let startTime = Moment.now()

for testCnt in 0 ..< 3:
await sendMsgProc()
await sleepAsync(20.millis)

let endTime = Moment.now()
let elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed)
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

asyncTest "push message with rate limit reject":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()

await allFutures(serverSwitch.start(), clientSwitch.start())

## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
server =
await newTestWakuLightpushNode(serverSwitch, handler, some((3, 1000.millis)))
client = newTestWakuLightpushClient(clientSwitch)

let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let topic = DefaultPubsubTopic

let successProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)

check:
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message

let rejectProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)

check:
requestRes.isErr()
requestRes.error == "TOO_MANY_REQUESTS"

for testCnt in 0 .. 2:
await successProc()
await sleepAsync(20.millis)

await rejectProc()

await sleepAsync(1000.millis)

## next one shall succeed due to the rate limit time window has passed
await successProc()

## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
94 changes: 94 additions & 0 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,97 @@ procSuite "WakuNode - Store":

# Cleanup
waitFor allFutures(client.stop(), server.stop())

test "Store protocol queries does not violate request rate limitation":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))

waitFor allFutures(client.start(), server.start())

let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore((4, 1.seconds))

client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let requestProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

check queryRes.isOk()

let response = queryRes.get()
check:
response.messages == msgListA

for count in 0 ..< 4:
waitFor requestProc()
waitFor sleepAsync(20.millis)

waitFor sleepAsync(1.seconds)

for count in 0 ..< 4:
waitFor requestProc()
waitFor sleepAsync(20.millis)

# Cleanup
waitFor allFutures(client.stop(), server.stop())

test "Store protocol queries overrun request rate limitation":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))

waitFor allFutures(client.start(), server.start())

let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore((3, 500.millis))

client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let successProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

check queryRes.isOk()

let response = queryRes.get()
check:
response.messages == msgListA

let failsProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

check queryRes.isErr()
check queryRes.error == "TOO_MANY_REQUESTS"

for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)

waitFor failsProc()

waitFor sleepAsync(500.millis)

for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)

# Cleanup
waitFor allFutures(client.stop(), server.stop())
78 changes: 75 additions & 3 deletions tests/wakunode_rest/test_rest_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import
../../waku/waku_api/rest/lightpush/handlers as lightpush_api,
../../waku/waku_api/rest/lightpush/client as lightpush_api_client,
../../waku/waku_relay,
../../waku/common/ratelimit,
../testlib/wakucore,
../testlib/wakunode
../testlib/wakunode,
../testlib/testutils

proc testWakuNode(): WakuNode =
let
Expand All @@ -41,7 +43,9 @@ type RestLightPushTest = object
restServer: WakuRestServerRef
client: RestClientRef

proc init(T: type RestLightPushTest): Future[T] {.async.} =
proc init(
T: type RestLightPushTest, rateLimit: RateLimitSetting = (0, 0.millis)
): Future[T] {.async.} =
var testSetup = RestLightPushTest()
testSetup.serviceNode = testWakuNode()
testSetup.pushNode = testWakuNode()
Expand All @@ -55,7 +59,7 @@ proc init(T: type RestLightPushTest): Future[T] {.async.} =

await testSetup.consumerNode.mountRelay()
await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountLightPush()
await testSetup.serviceNode.mountLightPush(rateLimit)
testSetup.pushNode.mountLightPushClient()

testSetup.serviceNode.peerManager.addServicePeer(
Expand Down Expand Up @@ -178,6 +182,74 @@ suite "Waku v2 Rest API - lightpush":

await restLightPushTest.shutdown()

# disabled due to this bug in nim-chronos https://github.com/status-im/nim-chronos/issues/500
xasyncTest "Request rate limit push message":
# Given
let budgetCap = 3
let tokenPeriod = 500.millis
let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod))

restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

# When
let pushProc = proc() {.async.} =
let message: RelayWakuMessage = fakeWakuMessage(
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
)
.toRelayWakuMessage()

let requestBody =
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)

echo "response", $response

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT

let pushRejectedProc = proc() {.async.} =
let message: RelayWakuMessage = fakeWakuMessage(
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
)
.toRelayWakuMessage()

let requestBody =
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)

echo "response", $response

# Then
check:
response.status == 429

await pushProc()
await pushProc()
await pushProc()
await pushRejectedProc()

await sleepAsync(tokenPeriod)

for runCnt in 0 ..< 3:
let startTime = Moment.now()
for sendCnt in 0 ..< budgetCap:
await pushProc()

let endTime = Moment.now()
let elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed)

await restLightPushTest.shutdown()

## TODO: Re-work this test when lightpush protocol change is done: https://github.com/waku-org/pm/issues/93
## This test is similar when no available peer exists for publish. Currently it is returning success,
## that makes this test not useful.
Expand Down
Loading

0 comments on commit 67339fd

Please sign in to comment.