diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index 6e61bdd4da..c246f4618e 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -10,11 +10,13 @@ import ../testlib/[common, wakucore] proc newTestWakuLightpushNode*( - switch: Switch, handler: PushMessageHandler + switch: Switch, + handler: PushMessageHandler, + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): Future[WakuLightPush] {.async.} = let peerManager = PeerManager.new(switch) - proto = WakuLightPush.new(peerManager, rng, handler) + proto = WakuLightPush.new(peerManager, rng, handler, rateLimitSetting) await proto.start() switch.mount(proto) diff --git a/tests/waku_lightpush/test_all.nim b/tests/waku_lightpush/test_all.nim index d197df2db1..4e49809296 100644 --- a/tests/waku_lightpush/test_all.nim +++ b/tests/waku_lightpush/test_all.nim @@ -1 +1 @@ -import ./test_client +import ./test_client, ./test_ratelimit diff --git a/tests/waku_lightpush/test_client.nim b/tests/waku_lightpush/test_client.nim index 6ee327b513..337e1e09ec 100644 --- a/tests/waku_lightpush/test_client.nim +++ b/tests/waku_lightpush/test_client.nim @@ -203,7 +203,7 @@ suite "Waku Lightpush Client": # 1KiB message2 = fakeWakuMessage( contentTopic = contentTopic, payload = getByteSequence(10 * 1024) - ) # 10KiB + ) # 10KiB message3 = fakeWakuMessage( contentTopic = contentTopic, payload = getByteSequence(100 * 1024) ) # 100KiB diff --git a/tests/waku_lightpush/test_ratelimit.nim b/tests/waku_lightpush/test_ratelimit.nim new file mode 100644 index 0000000000..944d89f109 --- /dev/null +++ b/tests/waku_lightpush/test_ratelimit.nim @@ -0,0 +1,151 @@ +{.used.} + +import + std/[options, strscans], + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto + +import + ../../waku/[ + node/peer_manager, + common/ratelimit, + waku_core, + waku_lightpush, + waku_lightpush/client, + waku_lightpush/common, + waku_lightpush/protocol_metrics, + waku_lightpush/rpc, + waku_lightpush/rpc_codec, + ], + ../testlib/[assertions, wakucore, testasync, futures, testutils], + ./lightpush_utils, + ../resources/[pubsub_topics, content_topics, payloads] + +suite "Rate limited push service": + asyncTest "push message with rate limit not violated": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + var handlerFuture = newFuture[(string, WakuMessage)]() + let handler: PushMessageHandler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + + let + tokenPeriod = 500.millis + server = + await newTestWakuLightpushNode(serverSwitch, handler, some((3, tokenPeriod))) + client = newTestWakuLightpushClient(clientSwitch) + + let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo() + + let sendMsgProc = proc(): Future[void] {.async.} = + let message = fakeWakuMessage() + + handlerFuture = newFuture[(string, WakuMessage)]() + let requestRes = + await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + + check await handlerFuture.withTimeout(50.millis) + + assert requestRes.isOk(), requestRes.error + check handlerFuture.finished() + + let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read() + + check: + handledMessagePubsubTopic == DefaultPubsubTopic + handledMessage == message + + let waitInBetweenFor = 20.millis + + # Test cannot be too explicit about the time when the TokenBucket resets + # the internal timer, although in normal use there is no use case to care about it. + var firstWaitExtend = 300.millis + + for runCnt in 0 ..< 3: + let startTime = Moment.now() + for testCnt in 0 ..< 3: + await sendMsgProc() + await sleepAsync(20.millis) + + var endTime = Moment.now() + var elapsed: Duration = (endTime - startTime) + await sleepAsync(tokenPeriod - elapsed + firstWaitExtend) + firstWaitEXtend = 100.millis + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "push message with rate limit reject": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + var handlerFuture = newFuture[(string, WakuMessage)]() + let handler = proc( + peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[WakuLightPushResult[void]] {.async.} = + handlerFuture.complete((pubsubTopic, message)) + return ok() + + let + server = + await newTestWakuLightpushNode(serverSwitch, handler, some((3, 500.millis))) + client = newTestWakuLightpushClient(clientSwitch) + + let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo() + let topic = DefaultPubsubTopic + + let successProc = proc(): Future[void] {.async.} = + let message = fakeWakuMessage() + handlerFuture = newFuture[(string, WakuMessage)]() + let requestRes = + await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + discard await handlerFuture.withTimeout(10.millis) + + check: + requestRes.isOk() + handlerFuture.finished() + let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read() + check: + handledMessagePubsubTopic == DefaultPubsubTopic + handledMessage == message + + let rejectProc = proc(): Future[void] {.async.} = + let message = fakeWakuMessage() + handlerFuture = newFuture[(string, WakuMessage)]() + let requestRes = + await client.publish(DefaultPubsubTopic, message, peer = serverPeerId) + discard await handlerFuture.withTimeout(10.millis) + + check: + requestRes.isErr() + requestRes.error == "TOO_MANY_REQUESTS" + + for testCnt in 0 .. 2: + await successProc() + await sleepAsync(20.millis) + + await rejectProc() + + await sleepAsync(500.millis) + + ## next one shall succeed due to the rate limit time window has passed + await successProc() + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index e8727d6f7c..cab0b545e4 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -244,9 +244,7 @@ procSuite "WakuNode - Store": server.wakuFilterClient.registerPushHandler(filterHandler) let resp = waitFor server.filterSubscribe( - some(DefaultPubsubTopic), - DefaultContentTopic, - peer = filterSourcePeer, + some(DefaultPubsubTopic), DefaultContentTopic, peer = filterSourcePeer ) waitFor sleepAsync(100.millis) @@ -319,3 +317,97 @@ procSuite "WakuNode - Store": # Cleanup waitFor allFutures(client.stop(), server.stop()) + + test "Store protocol queries does not violate request rate limitation": + ## Setup + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) + clientKey = generateSecp256k1Key() + client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0)) + + waitFor allFutures(client.start(), server.start()) + + let mountArchiveRes = server.mountArchive(archiveA) + assert mountArchiveRes.isOk(), mountArchiveRes.error + + waitFor server.mountStore((4, 500.millis)) + + client.mountStoreClient() + + ## Given + let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) + let serverPeer = server.peerInfo.toRemotePeerInfo() + + let requestProc = proc() {.async.} = + let queryRes = waitFor client.query(req, peer = serverPeer) + + assert queryRes.isOk(), queryRes.error + + let response = queryRes.get() + check: + response.messages == msgListA + + for count in 0 ..< 4: + waitFor requestProc() + waitFor sleepAsync(20.millis) + + waitFor sleepAsync(500.millis) + + for count in 0 ..< 4: + waitFor requestProc() + waitFor sleepAsync(20.millis) + + # Cleanup + waitFor allFutures(client.stop(), server.stop()) + + test "Store protocol queries overrun request rate limitation": + ## Setup + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) + clientKey = generateSecp256k1Key() + client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0)) + + waitFor allFutures(client.start(), server.start()) + + let mountArchiveRes = server.mountArchive(archiveA) + assert mountArchiveRes.isOk(), mountArchiveRes.error + + waitFor server.mountStore((3, 500.millis)) + + client.mountStoreClient() + + ## Given + let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) + let serverPeer = server.peerInfo.toRemotePeerInfo() + + let successProc = proc() {.async.} = + let queryRes = waitFor client.query(req, peer = serverPeer) + + check queryRes.isOk() + + let response = queryRes.get() + check: + response.messages == msgListA + + let failsProc = proc() {.async.} = + let queryRes = waitFor client.query(req, peer = serverPeer) + + check queryRes.isErr() + check queryRes.error == "TOO_MANY_REQUESTS" + + for count in 0 ..< 3: + waitFor successProc() + waitFor sleepAsync(20.millis) + + waitFor failsProc() + + waitFor sleepAsync(500.millis) + + for count in 0 ..< 3: + waitFor successProc() + waitFor sleepAsync(20.millis) + + # Cleanup + waitFor allFutures(client.stop(), server.stop()) diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index b9a321a826..e2d66155ea 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -22,8 +22,10 @@ import ../../waku/waku_api/rest/lightpush/handlers as lightpush_api, ../../waku/waku_api/rest/lightpush/client as lightpush_api_client, ../../waku/waku_relay, + ../../waku/common/ratelimit, ../testlib/wakucore, - ../testlib/wakunode + ../testlib/wakunode, + ../testlib/testutils proc testWakuNode(): WakuNode = let @@ -41,7 +43,9 @@ type RestLightPushTest = object restServer: WakuRestServerRef client: RestClientRef -proc init(T: type RestLightPushTest): Future[T] {.async.} = +proc init( + T: type RestLightPushTest, rateLimit: RateLimitSetting = (0, 0.millis) +): Future[T] {.async.} = var testSetup = RestLightPushTest() testSetup.serviceNode = testWakuNode() testSetup.pushNode = testWakuNode() @@ -55,7 +59,7 @@ proc init(T: type RestLightPushTest): Future[T] {.async.} = await testSetup.consumerNode.mountRelay() await testSetup.serviceNode.mountRelay() - await testSetup.serviceNode.mountLightPush() + await testSetup.serviceNode.mountLightPush(rateLimit) testSetup.pushNode.mountLightPushClient() testSetup.serviceNode.peerManager.addServicePeer( @@ -178,6 +182,74 @@ suite "Waku v2 Rest API - lightpush": await restLightPushTest.shutdown() + # disabled due to this bug in nim-chronos https://github.com/status-im/nim-chronos/issues/500 + xasyncTest "Request rate limit push message": + # Given + let budgetCap = 3 + let tokenPeriod = 500.millis + let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod)) + + restLightPushTest.consumerNode.subscribe( + (kind: PubsubSub, topic: DefaultPubsubTopic) + ) + restLightPushTest.serviceNode.subscribe( + (kind: PubsubSub, topic: DefaultPubsubTopic) + ) + require: + toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 + + # When + let pushProc = proc() {.async.} = + let message: RelayWakuMessage = fakeWakuMessage( + contentTopic = DefaultContentTopic, payload = toBytes("TEST-1") + ) + .toRelayWakuMessage() + + let requestBody = + PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message) + let response = await restLightPushTest.client.sendPushRequest(requestBody) + + echo "response", $response + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + + let pushRejectedProc = proc() {.async.} = + let message: RelayWakuMessage = fakeWakuMessage( + contentTopic = DefaultContentTopic, payload = toBytes("TEST-1") + ) + .toRelayWakuMessage() + + let requestBody = + PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message) + let response = await restLightPushTest.client.sendPushRequest(requestBody) + + echo "response", $response + + # Then + check: + response.status == 429 + + await pushProc() + await pushProc() + await pushProc() + await pushRejectedProc() + + await sleepAsync(tokenPeriod) + + for runCnt in 0 ..< 3: + let startTime = Moment.now() + for sendCnt in 0 ..< budgetCap: + await pushProc() + + let endTime = Moment.now() + let elapsed: Duration = (endTime - startTime) + await sleepAsync(tokenPeriod - elapsed) + + await restLightPushTest.shutdown() + ## TODO: Re-work this test when lightpush protocol change is done: https://github.com/waku-org/pm/issues/93 ## This test is similar when no available peer exists for publish. Currently it is returning success, ## that makes this test not useful. diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index 6f10ce07aa..85a92c885b 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -532,7 +532,7 @@ procSuite "Waku v2 Rest API - Store": let node = testWakuNode() await node.start() - let restPort = Port(58014) + let restPort = Port(58017) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() @@ -638,3 +638,145 @@ procSuite "Waku v2 Rest API - Store": storeMessage.timestamp.get() == msg.timestamp storeMessage.ephemeral.get() == msg.ephemeral storeMessage.meta.get() == base64.encode(msg.meta) + + asyncTest "Rate limit store node history query": + # Test adapted from the analogous present at waku_store/test_wakunode_store.nim + let node = testWakuNode() + await node.start() + + let restPort = Port(58018) + let restAddress = parseIpAddress("0.0.0.0") + let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + + installStoreApiHandlers(restServer.router, node) + restServer.start() + + # WakuStore setup + let driver: ArchiveDriver = QueueDriver.new() + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + + await node.mountStore((2, 500.millis)) + node.mountStoreClient() + + let key = generateEcdsaKey() + var peerSwitch = newStandardSwitch(some(key)) + await peerSwitch.start() + + peerSwitch.mount(node.wakuStore) + + # Now prime it with some history before tests + let timeOrigin = wakucore.now() + let msgList = + @[ + fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin)), + fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin)), + fakeWakuMessage(@[byte 02], ts = ts(20, timeOrigin)), + fakeWakuMessage(@[byte 03], ts = ts(30, timeOrigin)), + fakeWakuMessage(@[byte 04], ts = ts(40, timeOrigin)), + fakeWakuMessage(@[byte 05], ts = ts(50, timeOrigin)), + fakeWakuMessage(@[byte 06], ts = ts(60, timeOrigin)), + fakeWakuMessage(@[byte 07], ts = ts(70, timeOrigin)), + fakeWakuMessage(@[byte 08], ts = ts(80, timeOrigin)), + fakeWakuMessage(@[byte 09], ts = ts(90, timeOrigin)), + ] + for msg in msgList: + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo() + let fullAddr = $remotePeerInfo.addrs[0] & "/p2p/" & $remotePeerInfo.peerId + + var pages = newSeq[seq[WakuMessage]](2) + + # Fields that compose a HistoryCursor object + var reqPubsubTopic = DefaultPubsubTopic + var reqSenderTime = Timestamp(0) + var reqStoreTime = Timestamp(0) + var reqDigest = waku_store.MessageDigest() + + for i in 0 ..< 2: + let response = await client.getStoreMessagesV1( + encodeUrl(fullAddr), + encodeUrl(reqPubsubTopic), + "", # content topics. Empty ignores the field. + "", # start time. Empty ignores the field. + "", # end time. Empty ignores the field. + encodeUrl($reqSenderTime), # sender time + encodeUrl($reqStoreTime), # store time + reqDigest.toRestStringMessageDigest(), + # base64-encoded digest. Empty ignores the field. + "3", # page size. Empty implies default page size. + "true", # ascending + ) + + var wakuMessages = newSeq[WakuMessage](0) + for j in 0 ..< response.data.messages.len: + wakuMessages.add(response.data.messages[j].toWakuMessage()) + + pages[i] = wakuMessages + + # populate the cursor for next page + if response.data.cursor.isSome(): + reqPubsubTopic = response.data.cursor.get().pubsubTopic + reqDigest = response.data.cursor.get().digest + reqSenderTime = response.data.cursor.get().senderTime + reqStoreTime = response.data.cursor.get().storeTime + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + + check: + pages[0] == msgList[0 .. 2] + pages[1] == msgList[3 .. 5] + + # request last third will lead to rate limit rejection + var response = await client.getStoreMessagesV1( + encodeUrl(fullAddr), + encodeUrl(reqPubsubTopic), + "", # content topics. Empty ignores the field. + "", # start time. Empty ignores the field. + "", # end time. Empty ignores the field. + encodeUrl($reqSenderTime), # sender time + encodeUrl($reqStoreTime), # store time + reqDigest.toRestStringMessageDigest(), + # base64-encoded digest. Empty ignores the field. + ) + + check: + response.status == 429 + $response.contentType == $MIMETYPE_TEXT + response.data.error_message.get == "Request rate limmit reached" + + await sleepAsync(500.millis) + + # retry after respective amount of time shall succeed + response = await client.getStoreMessagesV1( + encodeUrl(fullAddr), + encodeUrl(reqPubsubTopic), + "", # content topics. Empty ignores the field. + "", # start time. Empty ignores the field. + "", # end time. Empty ignores the field. + encodeUrl($reqSenderTime), # sender time + encodeUrl($reqStoreTime), # store time + reqDigest.toRestStringMessageDigest(), + # base64-encoded digest. Empty ignores the field. + "5", # page size. Empty implies default page size. + "true", # ascending + ) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + + var wakuMessages = newSeq[WakuMessage](0) + for j in 0 ..< response.data.messages.len: + wakuMessages.add(response.data.messages[j].toWakuMessage()) + + check wakuMessages == msgList[6 .. 9] + + await restServer.stop() + await restServer.closeWait() + await node.stop() diff --git a/waku/common/ratelimit.nim b/waku/common/ratelimit.nim new file mode 100644 index 0000000000..4e6ba52c62 --- /dev/null +++ b/waku/common/ratelimit.nim @@ -0,0 +1,24 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/options +import chronos/timer +import ./tokenbucket + +export tokenbucket + +type RateLimitSetting* = tuple[volume: int, period: Duration] + +let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (60, 1.minutes) + +proc newTokenBucket*(setting: Option[RateLimitSetting]): Option[TokenBucket] = + if setting.isNone: + return none[TokenBucket]() + + let (volume, period) = setting.get() + if volume <= 0 or period <= 0.seconds: + return none[TokenBucket]() + + return some(TokenBucket.new(volume, period)) diff --git a/waku/common/tokenbucket.nim b/waku/common/tokenbucket.nim new file mode 100644 index 0000000000..abd874c84b --- /dev/null +++ b/waku/common/tokenbucket.nim @@ -0,0 +1,64 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import chronos + +## This is an extract from chronos/ratelimit.nim due to the found bug in the original implementation. +## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class. +## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation. +type TokenBucket* = ref object + budget*: int ## Current number of tokens in the bucket + budgetCap: int ## Bucket capacity + lastTimeFull: Moment + ## This timer measures the proper periodizaiton of the bucket refilling + fillDuration: Duration ## Refill period + +## Update will take place if bucket is empty and trying to consume tokens. +## It checks if the bucket can be replenished as refill duration is passed or not. +proc update(bucket: TokenBucket, currentTime: Moment) = + if bucket.fillDuration == default(Duration): + bucket.budget = min(bucket.budgetCap, bucket.budget) + return + + let timeDeltaFromLastFull = currentTime - bucket.lastTimeFull + + if timeDeltaFromLastFull.milliseconds < bucket.fillDuration.milliseconds: + return + + bucket.budget = bucket.budgetCap + bucket.lastTimeFull = currentTime + +proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = + ## If `tokens` are available, consume them, + ## Otherwhise, return false. + + if bucket.budget == bucket.budgetCap: + bucket.lastTimeFull = now + + if bucket.budget >= tokens: + bucket.budget -= tokens + return true + + bucket.update(now) + + if bucket.budget >= tokens: + bucket.budget -= tokens + return true + else: + return false + +proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = + ## Add `tokens` to the budget (capped to the bucket capacity) + bucket.budget += tokens + bucket.update(now) + +proc new*(T: type[TokenBucket], budgetCap: int, fillDuration: Duration = 1.seconds): T = + ## Create a TokenBucket + T( + budget: budgetCap, + budgetCap: budgetCap, + fillDuration: fillDuration, + lastTimeFull: Moment.now(), + ) diff --git a/waku/common/waku_service_metrics.nim b/waku/common/waku_service_metrics.nim new file mode 100644 index 0000000000..a6dcec8549 --- /dev/null +++ b/waku/common/waku_service_metrics.nim @@ -0,0 +1,12 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import metrics + +declarePublicCounter waku_service_requests, + "number of non-relay service requests received", ["service"] +declarePublicCounter waku_service_requests_rejected, + "number of non-relay service requests received being rejected due to limit overdue", + ["service"] diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 37fa9c4076..1dbe7d7dc9 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -589,6 +589,22 @@ type WakuNodeConf* = object name: "websocket-secure-cert-path" .}: string + ## Rate limitation config + ## Currently default to switch of rate limit until become official + requestRateLimit* {. + desc: + "Number of requests to serve by each service in the specified period. Set it to 0 for unlimited", + defaultValue: 0, + name: "request-rate-limit" + .}: int + + ## Currently default to switch of rate limit until become official + requestRatePeriod* {. + desc: "Period of request rate limitation in seconds. Set it to 0 for unlimited", + defaultValue: 0, + name: "request-rate-period" + .}: int64 + ## Parsing # NOTE: Keys are different in nim-libp2p diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index c2c3db0fd9..68d2fa7732 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -27,7 +27,8 @@ import ../waku_lightpush/common, ../waku_archive/driver/builder, ../waku_archive/retention_policy/builder, - ../common/utils/parse_size_units + ../common/utils/parse_size_units, + ../common/ratelimit ## Peer persistence @@ -241,7 +242,9 @@ proc setupProtocols( # Store setup try: - await mountStore(node) + let rateLimitSetting: RateLimitSetting = + (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) + await mountStore(node, rateLimitSetting) except CatchableError: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) @@ -256,7 +259,9 @@ proc setupProtocols( # NOTE Must be mounted after relay if conf.lightpush: try: - await mountLightPush(node) + let rateLimitSetting: RateLimitSetting = + (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) + await mountLightPush(node, rateLimitSetting) except CatchableError: return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 41741b3d65..bec396e2a9 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -45,7 +45,8 @@ import ../waku_peer_exchange, ../waku_rln_relay, ./config, - ./peer_manager + ./peer_manager, + ../common/ratelimit declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicHistogram waku_histogram_message_size, @@ -699,7 +700,9 @@ proc toHistoryResult*(res: ArchiveResult): HistoryResult = ) ) -proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} = +proc mountStore*( + node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit +) {.async, raises: [Defect, LPError].} = info "mounting waku store protocol" if node.wakuArchive.isNil(): @@ -718,7 +721,8 @@ proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} = let response = await node.wakuArchive.findMessages(request) return response.toHistoryResult() - node.wakuStore = WakuStore.new(node.peerManager, node.rng, queryHandler) + node.wakuStore = + WakuStore.new(node.peerManager, node.rng, queryHandler, some(rateLimit)) if node.started: # Node has started already. Let's start store too. @@ -789,7 +793,9 @@ when defined(waku_exp_store_resume): ## Waku lightpush -proc mountLightPush*(node: WakuNode) {.async.} = +proc mountLightPush*( + node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit +) {.async.} = info "mounting light push" var pushHandler: PushMessageHandler @@ -813,7 +819,8 @@ proc mountLightPush*(node: WakuNode) {.async.} = return ok() debug "mounting lightpush with relay" - node.wakuLightPush = WakuLightPush.new(node.peerManager, node.rng, pushHandler) + node.wakuLightPush = + WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit)) if node.started: # Node has started already. Let's start lightpush too. diff --git a/waku/waku_api/rest/lightpush/handlers.nim b/waku/waku_api/rest/lightpush/handlers.nim index f341950775..3aa52da81d 100644 --- a/waku/waku_api/rest/lightpush/handlers.nim +++ b/waku/waku_api/rest/lightpush/handlers.nim @@ -77,6 +77,9 @@ proc installLightPushRequestHandler*( return RestApiResponse.serviceUnavailable("Push request timed out") if subFut.value().isErr(): + if subFut.value().error == TooManyRequestsMessage: + return RestApiResponse.tooManyRequests("Request rate limmit reached") + return RestApiResponse.serviceUnavailable( fmt("Failed to request a message push: {subFut.value().error}") ) diff --git a/waku/waku_api/rest/responses.nim b/waku/waku_api/rest/responses.nim index 7bb83f2dbe..105c7423dc 100644 --- a/waku/waku_api/rest/responses.nim +++ b/waku/waku_api/rest/responses.nim @@ -33,6 +33,9 @@ proc preconditionFailed*( ): RestApiResponse = RestApiResponse.error(Http412, msg, $MIMETYPE_TEXT) +proc tooManyRequests*(t: typedesc[RestApiResponse], msg: string = ""): RestApiResponse = + RestApiResponse.error(Http429, msg, $MIMETYPE_TEXT) + proc jsonResponse*( t: typedesc[RestApiResponse], data: auto, status: HttpCode = Http200 ): SerdesResult[RestApiResponse] = diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index 01f5259ded..e25bbe4561 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -39,9 +39,15 @@ proc performHistoryQuery( let res = queryFut.read() if res.isErr(): - const msg = "Error occurred in queryFut.read()" - error msg, error = res.error - return RestApiResponse.internalServerError(fmt("{msg} [{res.error}]")) + const TooManyRequestErrorStr = + $HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS) + if res.error == TooManyRequestErrorStr: + debug "Request rate limmit reached on peer ", storePeer + return RestApiResponse.tooManyRequests("Request rate limmit reached") + else: + const msg = "Error occurred in queryFut.read()" + error msg, error = res.error + return RestApiResponse.internalServerError(fmt("{msg} [{res.error}]")) let storeResp = res.value.toStoreResponseRest() let resp = RestApiResponse.jsonResponse(storeResp, status = Http200) diff --git a/waku/waku_lightpush/common.nim b/waku/waku_lightpush/common.nim index 75f507b527..4e376b1aed 100644 --- a/waku/waku_lightpush/common.nim +++ b/waku/waku_lightpush/common.nim @@ -13,3 +13,5 @@ type WakuLightPushResult*[T] = Result[T, string] type PushMessageHandler* = proc( peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async.} + +const TooManyRequestsMessage* = "TOO_MANY_REQUESTS" diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index ddac2d9ec3..9cec6f7f48 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -11,7 +11,11 @@ import ./common, ./rpc, ./rpc_codec, - ./protocol_metrics + ./protocol_metrics, + ../common/ratelimit, + ../common/waku_service_metrics + +export ratelimit logScope: topics = "waku lightpush" @@ -20,6 +24,7 @@ type WakuLightPush* = ref object of LPProtocol rng*: ref rand.HmacDrbgContext peerManager*: PeerManager pushHandler*: PushMessageHandler + requestRateLimiter*: Option[TokenBucket] proc handleRequest*( wl: WakuLightPush, peerId: PeerId, buffer: seq[byte] @@ -27,6 +32,7 @@ proc handleRequest*( let reqDecodeRes = PushRPC.decode(buffer) var isSuccess = false + isRejectedDueRateLimit = false pushResponseInfo = "" requestId = "" @@ -34,7 +40,16 @@ proc handleRequest*( pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error elif reqDecodeRes.get().request.isNone(): pushResponseInfo = emptyRequestBodyFailure + elif wl.requestRateLimiter.isSome() and not wl.requestRateLimiter.get().tryConsume(1): + isRejectedDueRateLimit = true + let pushRpcRequest = reqDecodeRes.get() + debug "lightpush request rejected due rate limit exceeded", + peerId = peerId, requestId = pushRpcRequest.requestId + pushResponseInfo = TooManyRequestsMessage + waku_service_requests_rejected.inc(labelValues = ["Lightpush"]) else: + waku_service_requests.inc(labelValues = ["Lightpush"]) + let pushRpcRequest = reqDecodeRes.get() requestId = pushRpcRequest.requestId @@ -55,7 +70,7 @@ proc handleRequest*( isSuccess = handleRes.isOk() pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) - if not isSuccess: + if not isSuccess and not isRejectedDueRateLimit: waku_lightpush_errors.inc(labelValues = [pushResponseInfo]) error "failed to push message", error = pushResponseInfo let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo)) @@ -76,7 +91,13 @@ proc new*( peerManager: PeerManager, rng: ref rand.HmacDrbgContext, pushHandler: PushMessageHandler, + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): T = - let wl = WakuLightPush(rng: rng, peerManager: peerManager, pushHandler: pushHandler) + let wl = WakuLightPush( + rng: rng, + peerManager: peerManager, + pushHandler: pushHandler, + requestRateLimiter: newTokenBucket(rateLimitSetting), + ) wl.initProtocolHandler() return wl diff --git a/waku/waku_lightpush/protocol_metrics.nim b/waku/waku_lightpush/protocol_metrics.nim index 2c62d319dd..8e3b58f9da 100644 --- a/waku/waku_lightpush/protocol_metrics.nim +++ b/waku/waku_lightpush/protocol_metrics.nim @@ -18,3 +18,4 @@ const emptyRequestBodyFailure* = "empty_request_body_failure" emptyResponseBodyFailure* = "empty_response_body_failure" messagePushFailure* = "message_push_failure" + requestLimitReachedFailure* = "request_limit_reached_failure" diff --git a/waku/waku_store/common.nim b/waku/waku_store/common.nim index 19f4661c9e..67af41a68a 100644 --- a/waku/waku_store/common.nim +++ b/waku/waku_store/common.nim @@ -57,6 +57,7 @@ type UNKNOWN = uint32(000) BAD_RESPONSE = uint32(300) BAD_REQUEST = uint32(400) + TOO_MANY_REQUESTS = uint32(429) SERVICE_UNAVAILABLE = uint32(503) PEER_DIAL_FAILURE = uint32(504) @@ -73,7 +74,7 @@ type proc parse*(T: type HistoryErrorKind, kind: uint32): T = case kind - of 000, 200, 300, 400, 503: + of 000, 200, 300, 400, 429, 503: HistoryErrorKind(kind) else: HistoryErrorKind.UNKNOWN @@ -86,6 +87,8 @@ proc `$`*(err: HistoryError): string = "BAD_RESPONSE: " & err.cause of HistoryErrorKind.BAD_REQUEST: "BAD_REQUEST: " & err.cause + of HistoryErrorKind.TOO_MANY_REQUESTS: + "TOO_MANY_REQUESTS" of HistoryErrorKind.SERVICE_UNAVAILABLE: "SERVICE_UNAVAILABLE" of HistoryErrorKind.UNKNOWN: diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index be88b0ac21..7f7118dab4 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -18,7 +18,14 @@ import libp2p/stream/connection, metrics import - ../waku_core, ../node/peer_manager, ./common, ./rpc, ./rpc_codec, ./protocol_metrics + ../waku_core, + ../node/peer_manager, + ./common, + ./rpc, + ./rpc_codec, + ./protocol_metrics, + ../common/ratelimit, + ../common/waku_service_metrics logScope: topics = "waku store" @@ -33,6 +40,7 @@ type WakuStore* = ref object of LPProtocol peerManager: PeerManager rng: ref rand.HmacDrbgContext queryHandler*: HistoryQueryHandler + requestRateLimiter*: Option[TokenBucket] ## Protocol @@ -55,6 +63,18 @@ proc initProtocolHandler(ws: WakuStore) = # TODO: Return (BAD_REQUEST, cause: "empty query") return + if ws.requestRateLimiter.isSome() and not ws.requestRateLimiter.get().tryConsume(1): + trace "store query request rejected due rate limit exceeded", + peerId = $conn.peerId, requestId = reqRpc.requestId + let error = HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC() + let response = HistoryResponseRPC(error: error) + let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(response)) + await conn.writeLp(rpc.encode().buffer) + waku_service_requests_rejected.inc(labelValues = ["Store"]) + return + + waku_service_requests.inc(labelValues = ["Store"]) + let requestId = reqRpc.requestId request = reqRpc.query.get().toAPI() @@ -101,11 +121,17 @@ proc new*( peerManager: PeerManager, rng: ref rand.HmacDrbgContext, queryHandler: HistoryQueryHandler, + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): T = # Raise a defect if history query handler is nil if queryHandler.isNil(): raise newException(NilAccessDefect, "history query handler is nil") - let ws = WakuStore(rng: rng, peerManager: peerManager, queryHandler: queryHandler) + let ws = WakuStore( + rng: rng, + peerManager: peerManager, + queryHandler: queryHandler, + requestRateLimiter: newTokenBucket(rateLimitSetting), + ) ws.initProtocolHandler() ws diff --git a/waku/waku_store/rpc.nim b/waku/waku_store/rpc.nim index 22e0d55ec6..9a8887aeef 100644 --- a/waku/waku_store/rpc.nim +++ b/waku/waku_store/rpc.nim @@ -62,6 +62,7 @@ type ## the state of its request NONE = uint32(0) INVALID_CURSOR = uint32(1) + TOO_MANY_REQUESTS = uint32(429) SERVICE_UNAVAILABLE = uint32(503) HistoryResponseRPC* = object @@ -76,7 +77,7 @@ type proc parse*(T: type HistoryResponseErrorRPC, kind: uint32): T = case kind - of 0, 1, 503: + of 0, 1, 429, 503: HistoryResponseErrorRPC(kind) else: # TODO: Improve error variants/move to satus codes @@ -169,6 +170,8 @@ proc toRPC*(err: HistoryError): HistoryResponseErrorRPC = of HistoryErrorKind.BAD_REQUEST: # TODO: Respond aksi with the reason HistoryResponseErrorRPC.INVALID_CURSOR + of HistoryErrorKind.TOO_MANY_REQUESTS: + HistoryResponseErrorRPC.TOO_MANY_REQUESTS of HistoryErrorKind.SERVICE_UNAVAILABLE: HistoryResponseErrorRPC.SERVICE_UNAVAILABLE else: @@ -179,6 +182,8 @@ proc toAPI*(err: HistoryResponseErrorRPC): HistoryError = case err of HistoryResponseErrorRPC.INVALID_CURSOR: HistoryError(kind: HistoryErrorKind.BAD_REQUEST, cause: "invalid cursor") + of HistoryResponseErrorRPC.TOO_MANY_REQUESTS: + HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS) of HistoryResponseErrorRPC.SERVICE_UNAVAILABLE: HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE) else: