From 4936df6c10bbdf581079443d76a2e7a763b2cb20 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 9 May 2024 20:07:49 +0200 Subject: [PATCH] feat: Added flexible rate limit checks for store, legacy store and lightpush (#2668) * Added flexible rate limit checks for store, legacy store and lightpush. Also added rate and traffic metrics. * Fix chat2 after WakuLegacyStoreCodec rename * Update waku/common/ratelimit.nim Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> * Update waku/common/ratelimit.nim Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> * Update waku/waku_store_legacy/protocol.nim Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> * Fix review findings, added limit to debug logs --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- apps/chat2/chat2.nim | 2 +- tests/waku_store_legacy/test_resume.nim | 626 ++++++++++--------- waku/common/ratelimit.nim | 35 +- waku/common/tokenbucket.nim | 3 + waku/common/waku_service_metrics.nim | 6 + waku/factory/node_factory.nim | 4 +- waku/node/waku_node.nim | 4 +- waku/waku_api/rest/admin/handlers.nim | 18 +- waku/waku_api/rest/legacy_store/handlers.nim | 2 +- waku/waku_lightpush/protocol.nim | 41 +- waku/waku_store/protocol.nim | 52 +- waku/waku_store_legacy/client.nim | 4 +- waku/waku_store_legacy/common.nim | 2 +- waku/waku_store_legacy/protocol.nim | 150 +++-- 14 files changed, 529 insertions(+), 420 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 7aee403ae7..64ab61c392 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -470,7 +470,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = echo "Connecting to storenode: " & $(storenode.get()) node.mountLegacyStoreClient() - node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec) + node.peerManager.addServicePeer(storenode.get(), WakuLegacyStoreCodec) proc storeHandler(response: HistoryResponse) {.gcsafe.} = for msg in response.messages: diff --git a/tests/waku_store_legacy/test_resume.nim b/tests/waku_store_legacy/test_resume.nim index d061481f64..a35c2f9bd4 100644 --- a/tests/waku_store_legacy/test_resume.nim +++ b/tests/waku_store_legacy/test_resume.nim @@ -1,332 +1,340 @@ {.used.} -import - std/[options, tables, sets], - testutils/unittests, - chronos, - chronicles, - libp2p/crypto/crypto -import - ../../waku/common/databases/db_sqlite, - ../../waku/waku_archive/driver, - ../../waku/waku_archive/driver/sqlite_driver/sqlite_driver, - ../../waku/node/peer_manager, - ../../waku/waku_core, - ../../waku/waku_core/message/digest, - ../../waku/waku_store_legacy, - ../waku_store_legacy/store_utils, - ../waku_archive/archive_utils, - ./testlib/common, - ./testlib/switch - -procSuite "Waku Store - resume store": - ## Fixtures - let storeA = block: - let store = newTestMessageStore() - let msgList = - @[ - fakeWakuMessage( - payload = @[byte 0], contentTopic = ContentTopic("2"), ts = ts(0) - ), - fakeWakuMessage( - payload = @[byte 1], contentTopic = ContentTopic("1"), ts = ts(1) - ), - fakeWakuMessage( - payload = @[byte 2], contentTopic = ContentTopic("2"), ts = ts(2) - ), - fakeWakuMessage( - payload = @[byte 3], contentTopic = ContentTopic("1"), ts = ts(3) - ), - fakeWakuMessage( - payload = @[byte 4], contentTopic = ContentTopic("2"), ts = ts(4) - ), - fakeWakuMessage( - payload = @[byte 5], contentTopic = ContentTopic("1"), ts = ts(5) - ), - fakeWakuMessage( - payload = @[byte 6], contentTopic = ContentTopic("2"), ts = ts(6) - ), - fakeWakuMessage( - payload = @[byte 7], contentTopic = ContentTopic("1"), ts = ts(7) - ), - fakeWakuMessage( - payload = @[byte 8], contentTopic = ContentTopic("2"), ts = ts(8) - ), - fakeWakuMessage( - payload = @[byte 9], contentTopic = ContentTopic("1"), ts = ts(9) - ), - ] - - for msg in msgList: - require store - .put( - DefaultPubsubTopic, - msg, - computeDigest(msg), - computeMessageHash(DefaultPubsubTopic, msg), - msg.timestamp, +when defined(waku_exp_store_resume): + # TODO: Review store resume test cases (#1282) + # Ongoing changes to test code base had ruin this test meanwhile, need to investigate and fix + + import + std/[options, tables, sets], + testutils/unittests, + chronos, + chronicles, + libp2p/crypto/crypto + import + ../../waku/common/databases/db_sqlite, + ../../waku/waku_archive/driver, + ../../waku/waku_archive/driver/sqlite_driver/sqlite_driver, + ../../waku/node/peer_manager, + ../../waku/waku_core, + ../../waku/waku_core/message/digest, + ../../waku/waku_store_legacy, + ../waku_store_legacy/store_utils, + ../waku_archive/archive_utils, + ./testlib/common, + ./testlib/switch + + procSuite "Waku Store - resume store": + ## Fixtures + let storeA = block: + let store = newTestMessageStore() + let msgList = + @[ + fakeWakuMessage( + payload = @[byte 0], contentTopic = ContentTopic("2"), ts = ts(0) + ), + fakeWakuMessage( + payload = @[byte 1], contentTopic = ContentTopic("1"), ts = ts(1) + ), + fakeWakuMessage( + payload = @[byte 2], contentTopic = ContentTopic("2"), ts = ts(2) + ), + fakeWakuMessage( + payload = @[byte 3], contentTopic = ContentTopic("1"), ts = ts(3) + ), + fakeWakuMessage( + payload = @[byte 4], contentTopic = ContentTopic("2"), ts = ts(4) + ), + fakeWakuMessage( + payload = @[byte 5], contentTopic = ContentTopic("1"), ts = ts(5) + ), + fakeWakuMessage( + payload = @[byte 6], contentTopic = ContentTopic("2"), ts = ts(6) + ), + fakeWakuMessage( + payload = @[byte 7], contentTopic = ContentTopic("1"), ts = ts(7) + ), + fakeWakuMessage( + payload = @[byte 8], contentTopic = ContentTopic("2"), ts = ts(8) + ), + fakeWakuMessage( + payload = @[byte 9], contentTopic = ContentTopic("1"), ts = ts(9) + ), + ] + + for msg in msgList: + require store + .put( + DefaultPubsubTopic, + msg, + computeDigest(msg), + computeMessageHash(DefaultPubsubTopic, msg), + msg.timestamp, + ) + .isOk() + + store + + let storeB = block: + let store = newTestMessageStore() + let msgList2 = + @[ + fakeWakuMessage( + payload = @[byte 0], contentTopic = ContentTopic("2"), ts = ts(0) + ), + fakeWakuMessage( + payload = @[byte 11], contentTopic = ContentTopic("1"), ts = ts(1) + ), + fakeWakuMessage( + payload = @[byte 12], contentTopic = ContentTopic("2"), ts = ts(2) + ), + fakeWakuMessage( + payload = @[byte 3], contentTopic = ContentTopic("1"), ts = ts(3) + ), + fakeWakuMessage( + payload = @[byte 4], contentTopic = ContentTopic("2"), ts = ts(4) + ), + fakeWakuMessage( + payload = @[byte 5], contentTopic = ContentTopic("1"), ts = ts(5) + ), + fakeWakuMessage( + payload = @[byte 13], contentTopic = ContentTopic("2"), ts = ts(6) + ), + fakeWakuMessage( + payload = @[byte 14], contentTopic = ContentTopic("1"), ts = ts(7) + ), + ] + + for msg in msgList2: + require store + .put( + DefaultPubsubTopic, + msg, + computeDigest(msg), + computeMessageHash(DefaultPubsubTopic, msg), + msg.timestamp, + ) + .isOk() + + store + + asyncTest "multiple query to multiple peers with pagination": + ## Setup + let + serverSwitchA = newTestSwitch() + serverSwitchB = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures( + serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start() ) - .isOk() - store - - let storeB = block: - let store = newTestMessageStore() - let msgList2 = - @[ - fakeWakuMessage( - payload = @[byte 0], contentTopic = ContentTopic("2"), ts = ts(0) - ), - fakeWakuMessage( - payload = @[byte 11], contentTopic = ContentTopic("1"), ts = ts(1) - ), - fakeWakuMessage( - payload = @[byte 12], contentTopic = ContentTopic("2"), ts = ts(2) - ), - fakeWakuMessage( - payload = @[byte 3], contentTopic = ContentTopic("1"), ts = ts(3) - ), - fakeWakuMessage( - payload = @[byte 4], contentTopic = ContentTopic("2"), ts = ts(4) - ), - fakeWakuMessage( - payload = @[byte 5], contentTopic = ContentTopic("1"), ts = ts(5) - ), - fakeWakuMessage( - payload = @[byte 13], contentTopic = ContentTopic("2"), ts = ts(6) - ), - fakeWakuMessage( - payload = @[byte 14], contentTopic = ContentTopic("1"), ts = ts(7) - ), - ] - - for msg in msgList2: - require store - .put( - DefaultPubsubTopic, - msg, - computeDigest(msg), - computeMessageHash(DefaultPubsubTopic, msg), - msg.timestamp, - ) - .isOk() + let + serverA = await newTestWakuStoreNode(serverSwitchA, store = testStore) + serverB = await newTestWakuStoreNode(serverSwitchB, store = testStore) + client = newTestWakuStoreClient(clientSwitch) - store + ## Given + let peers = + @[ + serverSwitchA.peerInfo.toRemotePeerInfo(), + serverSwitchB.peerInfo.toRemotePeerInfo(), + ] + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5) - asyncTest "multiple query to multiple peers with pagination": - ## Setup - let - serverSwitchA = newTestSwitch() - serverSwitchB = newTestSwitch() - clientSwitch = newTestSwitch() + ## When + let res = await client.queryLoop(req, peers) - await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start()) + ## Then + check: + res.isOk() - let - serverA = await newTestWakuStoreNode(serverSwitchA, store = testStore) - serverB = await newTestWakuStoreNode(serverSwitchB, store = testStore) - client = newTestWakuStoreClient(clientSwitch) + let response = res.tryGet() + check: + response.len == 10 - ## Given - let peers = - @[ - serverSwitchA.peerInfo.toRemotePeerInfo(), - serverSwitchB.peerInfo.toRemotePeerInfo(), - ] - let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5) + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop()) - ## When - let res = await client.queryLoop(req, peers) - - ## Then - check: - res.isOk() + asyncTest "resume message history": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() - let response = res.tryGet() - check: - response.len == 10 - - ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop()) - - asyncTest "resume message history": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) + await allFutures(serverSwitch.start(), clientSwitch.start()) - let - server = await newTestWakuStore(serverSwitch, store = storeA) - client = await newTestWakuStore(clientSwitch) + let + server = await newTestWakuStore(serverSwitch, store = storeA) + client = await newTestWakuStore(clientSwitch) - client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - - ## When - let res = await client.resume() - - ## Then - check res.isOk() + client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - let resumedMessagesCount = res.tryGet() - let storedMessagesCount = client.store.getMessagesCount().tryGet() - check: - resumedMessagesCount == 10 - storedMessagesCount == 10 + ## When + let res = await client.resume() - ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitch.stop()) + ## Then + check res.isOk() - asyncTest "resume history from a list of candidates - offline peer": - ## Setup - let - clientSwitch = newTestSwitch() - offlineSwitch = newTestSwitch() - - await clientSwitch.start() - - let client = await newTestWakuStore(clientSwitch) + let resumedMessagesCount = res.tryGet() + let storedMessagesCount = client.store.getMessagesCount().tryGet() + check: + resumedMessagesCount == 10 + storedMessagesCount == 10 - ## Given - let peers = @[offlineSwitch.peerInfo.toRemotePeerInfo()] - - ## When - let res = await client.resume(some(peers)) - - ## Then - check res.isErr() - - ## Cleanup - await clientSwitch.stop() - - asyncTest "resume history from a list of candidates - online and offline peers": - ## Setup - let - offlineSwitch = newTestSwitch() - serverASwitch = newTestSwitch() - serverBSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverASwitch.start(), serverBSwitch.start(), clientSwitch.start()) - - let - serverA = await newTestWakuStore(serverASwitch, store = storeA) - serverB = await newTestWakuStore(serverBSwitch, store = storeB) - client = await newTestWakuStore(clientSwitch) - - ## Given - let peers = - @[ - offlineSwitch.peerInfo.toRemotePeerInfo(), - serverASwitch.peerInfo.toRemotePeerInfo(), - serverBSwitch.peerInfo.toRemotePeerInfo(), - ] - - ## When - let res = await client.resume(some(peers)) - - ## Then - # `client` is expected to retrieve 14 messages: - # - The store mounted on `serverB` holds 10 messages (see `storeA` fixture) - # - The store mounted on `serverB` holds 7 messages (see `storeB` fixture) - # Both stores share 3 messages, resulting in 14 unique messages in total - check res.isOk() - - let restoredMessagesCount = res.tryGet() - let storedMessagesCount = client.store.getMessagesCount().tryGet() - check: - restoredMessagesCount == 14 - storedMessagesCount == 14 - - ## Cleanup - await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop()) - -suite "WakuNode - waku store": - asyncTest "Resume proc fetches the history": - ## Setup - let - serverKey = generateSecp256k1Key() - server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) - clientKey = generateSecp256k1Key() - client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0)) - - await allFutures(client.start(), server.start()) - - let driver = newSqliteArchiveDriver() - server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) - await server.mountStore() - - let clientStore = StoreQueueRef.new() - await client.mountStore(store = clientStore) - client.mountStoreClient(store = clientStore) - - ## Given - let message = fakeWakuMessage() - require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() - - let serverPeer = server.peerInfo.toRemotePeerInfo() - - ## When - await client.resume(some(@[serverPeer])) - - # Then - check: - client.wakuStore.store.getMessagesCount().tryGet() == 1 - - ## Cleanup - await allFutures(client.stop(), server.stop()) - - asyncTest "Resume proc discards duplicate messages": - ## Setup - let - serverKey = generateSecp256k1Key() - server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) - clientKey = generateSecp256k1Key() - client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0)) - - await allFutures(server.start(), client.start()) - await server.mountStore(store = StoreQueueRef.new()) - - let clientStore = StoreQueueRef.new() - await client.mountStore(store = clientStore) - client.mountStoreClient(store = clientStore) - - ## Given - let timeOrigin = now() - let - msg1 = fakeWakuMessage( - payload = "hello world1", ts = (timeOrigin + getNanoSecondTime(1)) - ) - msg2 = fakeWakuMessage( - payload = "hello world2", ts = (timeOrigin + getNanoSecondTime(2)) - ) - msg3 = fakeWakuMessage( - payload = "hello world3", ts = (timeOrigin + getNanoSecondTime(3)) - ) + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "resume history from a list of candidates - offline peer": + ## Setup + let + clientSwitch = newTestSwitch() + offlineSwitch = newTestSwitch() + + await clientSwitch.start() + + let client = await newTestWakuStore(clientSwitch) - require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk() - require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk() + ## Given + let peers = @[offlineSwitch.peerInfo.toRemotePeerInfo()] - # Insert the same message in both node's store - let - receivedTime3 = now() + getNanosecondTime(10) - digest3 = computeDigest(msg3) - require server.wakuStore.store - .put(DefaultPubsubTopic, msg3, digest3, receivedTime3) - .isOk() - require client.wakuStore.store - .put(DefaultPubsubTopic, msg3, digest3, receivedTime3) - .isOk() + ## When + let res = await client.resume(some(peers)) + + ## Then + check res.isErr() + + ## Cleanup + await clientSwitch.stop() + + asyncTest "resume history from a list of candidates - online and offline peers": + ## Setup + let + offlineSwitch = newTestSwitch() + serverASwitch = newTestSwitch() + serverBSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures( + serverASwitch.start(), serverBSwitch.start(), clientSwitch.start() + ) + + let + serverA = await newTestWakuStore(serverASwitch, store = storeA) + serverB = await newTestWakuStore(serverBSwitch, store = storeB) + client = await newTestWakuStore(clientSwitch) + + ## Given + let peers = + @[ + offlineSwitch.peerInfo.toRemotePeerInfo(), + serverASwitch.peerInfo.toRemotePeerInfo(), + serverBSwitch.peerInfo.toRemotePeerInfo(), + ] + + ## When + let res = await client.resume(some(peers)) + + ## Then + # `client` is expected to retrieve 14 messages: + # - The store mounted on `serverB` holds 10 messages (see `storeA` fixture) + # - The store mounted on `serverB` holds 7 messages (see `storeB` fixture) + # Both stores share 3 messages, resulting in 14 unique messages in total + check res.isOk() + + let restoredMessagesCount = res.tryGet() + let storedMessagesCount = client.store.getMessagesCount().tryGet() + check: + restoredMessagesCount == 14 + storedMessagesCount == 14 + + ## Cleanup + await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop()) + + suite "WakuNode - waku store": + asyncTest "Resume proc fetches the history": + ## Setup + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) + clientKey = generateSecp256k1Key() + client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0)) + + await allFutures(client.start(), server.start()) + + let driver = newSqliteArchiveDriver() + server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + await server.mountStore() + + let clientStore = StoreQueueRef.new() + await client.mountStore(store = clientStore) + client.mountStoreClient(store = clientStore) + + ## Given + let message = fakeWakuMessage() + require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() + + let serverPeer = server.peerInfo.toRemotePeerInfo() + + ## When + await client.resume(some(@[serverPeer])) + + # Then + check: + client.wakuStore.store.getMessagesCount().tryGet() == 1 + + ## Cleanup + await allFutures(client.stop(), server.stop()) + + asyncTest "Resume proc discards duplicate messages": + ## Setup + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) + clientKey = generateSecp256k1Key() + client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0)) + + await allFutures(server.start(), client.start()) + await server.mountStore(store = StoreQueueRef.new()) + + let clientStore = StoreQueueRef.new() + await client.mountStore(store = clientStore) + client.mountStoreClient(store = clientStore) + + ## Given + let timeOrigin = now() + let + msg1 = fakeWakuMessage( + payload = "hello world1", ts = (timeOrigin + getNanoSecondTime(1)) + ) + msg2 = fakeWakuMessage( + payload = "hello world2", ts = (timeOrigin + getNanoSecondTime(2)) + ) + msg3 = fakeWakuMessage( + payload = "hello world3", ts = (timeOrigin + getNanoSecondTime(3)) + ) + + require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk() + require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk() + + # Insert the same message in both node's store + let + receivedTime3 = now() + getNanosecondTime(10) + digest3 = computeDigest(msg3) + require server.wakuStore.store + .put(DefaultPubsubTopic, msg3, digest3, receivedTime3) + .isOk() + require client.wakuStore.store + .put(DefaultPubsubTopic, msg3, digest3, receivedTime3) + .isOk() - let serverPeer = server.peerInfo.toRemotePeerInfo() + let serverPeer = server.peerInfo.toRemotePeerInfo() - ## When - await client.resume(some(@[serverPeer])) + ## When + await client.resume(some(@[serverPeer])) - ## Then - check: - # If the duplicates are discarded properly, then the total number of messages after resume should be 3 - client.wakuStore.store.getMessagesCount().tryGet() == 3 + ## Then + check: + # If the duplicates are discarded properly, then the total number of messages after resume should be 3 + client.wakuStore.store.getMessagesCount().tryGet() == 3 - await allFutures(client.stop(), server.stop()) + await allFutures(client.stop(), server.stop()) diff --git a/waku/common/ratelimit.nim b/waku/common/ratelimit.nim index 107eee5adf..8fb91219a7 100644 --- a/waku/common/ratelimit.nim +++ b/waku/common/ratelimit.nim @@ -3,8 +3,8 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/options -import chronos/timer +import std/options, chronos/timer, libp2p/stream/connection + import ./tokenbucket export tokenbucket @@ -23,3 +23,34 @@ proc newTokenBucket*(setting: Option[RateLimitSetting]): Option[TokenBucket] = return none[TokenBucket]() return some(TokenBucket.new(volume, period)) + +proc checkUsage( + t: var Option[TokenBucket], proto: string, conn: Connection +): bool {.raises: [].} = + if t.isNone(): + return true + + let tokenBucket = t.get() + if not tokenBucket.tryConsume(1): + return false + + return true + +template checkUsageLimit*( + t: var Option[TokenBucket], + proto: string, + conn: Connection, + bodyWithinLimit, bodyRejected: untyped, +) = + if t.checkUsage(proto, conn): + waku_service_requests.inc(labelValues = [proto]) + bodyWithinLimit + else: + waku_service_requests_rejected.inc(labelValues = [proto]) + bodyRejected + +func `$`*(ob: Option[TokenBucket]): string {.inline.} = + if ob.isNone(): + return "no-limit" + + return $ob.get() diff --git a/waku/common/tokenbucket.nim b/waku/common/tokenbucket.nim index abd874c84b..2223e0bbfc 100644 --- a/waku/common/tokenbucket.nim +++ b/waku/common/tokenbucket.nim @@ -62,3 +62,6 @@ proc new*(T: type[TokenBucket], budgetCap: int, fillDuration: Duration = 1.secon fillDuration: fillDuration, lastTimeFull: Moment.now(), ) + +func `$`*(b: TokenBucket): string {.inline.} = + return $b.budgetCap & "/" & $b.fillDuration diff --git a/waku/common/waku_service_metrics.nim b/waku/common/waku_service_metrics.nim index a6dcec8549..a58c48d1b8 100644 --- a/waku/common/waku_service_metrics.nim +++ b/waku/common/waku_service_metrics.nim @@ -10,3 +10,9 @@ declarePublicCounter waku_service_requests, declarePublicCounter waku_service_requests_rejected, "number of non-relay service requests received being rejected due to limit overdue", ["service"] + +declarePublicCounter waku_service_inbound_network_bytes, + "total incoming traffic of specific waku services", labels = ["service"] + +declarePublicCounter waku_service_outbound_network_bytes, + "total outgoing traffic of specific waku services", labels = ["service"] diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 7a14c9626b..f3cbb7fe74 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -269,7 +269,9 @@ proc setupProtocols( if conf.storenode != "": let storeNode = parsePeerInfo(conf.storenode) if storeNode.isOk(): - node.peerManager.addServicePeer(storeNode.value, legacy_common.WakuStoreCodec) + node.peerManager.addServicePeer( + storeNode.value, legacy_common.WakuLegacyStoreCodec + ) else: return err("failed to set node waku legacy store peer: " & storeNode.error) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 273a699eee..44076a91a0 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -735,7 +735,7 @@ proc mountLegacyStore*( await node.wakuLegacyStore.start() node.switch.mount( - node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuStoreCodec) + node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuLegacyStoreCodec) ) proc mountLegacyStoreClient*(node: WakuNode) = @@ -771,7 +771,7 @@ proc query*( if node.wakuLegacyStoreClient.isNil(): return err("waku legacy store client is nil") - let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuStoreCodec) + let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuLegacyStoreCodec) if peerOpt.isNone(): error "no suitable remote peers" return err("peer_not_found_failure") diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 7574683ab6..b0dc762b6f 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -14,6 +14,7 @@ import import ../../../waku_core, ../../../waku_store_legacy/common, + ../../../waku_store/common, ../../../waku_filter_v2, ../../../waku_lightpush/common, ../../../waku_relay, @@ -43,7 +44,6 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = var peers: WakuPeers = @[] if not node.wakuRelay.isNil(): - # Map managed peers to WakuPeers and add to return list let relayPeers = node.peerManager.peerStore.peers(WakuRelayCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), @@ -54,7 +54,6 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = tuplesToWakuPeers(peers, relayPeers) if not node.wakuFilter.isNil(): - # Map WakuFilter peers to WakuPeers and add to return list let filterV2Peers = node.peerManager.peerStore .peers(WakuFilterSubscribeCodec) .mapIt( @@ -66,8 +65,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, filterV2Peers) - if not node.wakuLegacyStore.isNil(): - # Map WakuStore peers to WakuPeers and add to return list + if not node.wakuStore.isNil(): let storePeers = node.peerManager.peerStore.peers(WakuStoreCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), @@ -77,6 +75,18 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, storePeers) + if not node.wakuLegacyStore.isNil(): + let legacyStorePeers = node.peerManager.peerStore + .peers(WakuLegacyStoreCodec) + .mapIt( + ( + multiaddr: constructMultiaddrStr(it), + protocol: WakuLegacyStoreCodec, + connected: it.connectedness == Connectedness.Connected, + ) + ) + tuplesToWakuPeers(peers, legacyStorePeers) + if not node.wakuLightPush.isNil(): # Map WakuStore peers to WakuPeers and add to return list let lightpushPeers = node.peerManager.peerStore.peers(WakuLightPushCodec).mapIt( diff --git a/waku/waku_api/rest/legacy_store/handlers.nim b/waku/waku_api/rest/legacy_store/handlers.nim index fdf23958e1..b05ece356e 100644 --- a/waku/waku_api/rest/legacy_store/handlers.nim +++ b/waku/waku_api/rest/legacy_store/handlers.nim @@ -243,7 +243,7 @@ proc installStoreApiHandlers*( return RestApiResponse.badRequest(error) let peerAddr = parsedPeerAddr.valueOr: - node.peerManager.selectPeer(WakuStoreCodec).valueOr: + node.peerManager.selectPeer(WakuLegacyStoreCodec).valueOr: let handler = discHandler.valueOr: return NoPeerNoDiscError diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 16be3beb50..02bca0582b 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -32,7 +32,6 @@ proc handleRequest*( let reqDecodeRes = PushRPC.decode(buffer) var isSuccess = false - isRejectedDueRateLimit = false pushResponseInfo = "" requestId = "" @@ -40,16 +39,7 @@ proc handleRequest*( pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error elif reqDecodeRes.get().request.isNone(): pushResponseInfo = emptyRequestBodyFailure - elif wl.requestRateLimiter.isSome() and not wl.requestRateLimiter.get().tryConsume(1): - isRejectedDueRateLimit = true - let pushRpcRequest = reqDecodeRes.get() - debug "lightpush request rejected due rate limit exceeded", - peerId = peerId, requestId = pushRpcRequest.requestId - pushResponseInfo = TooManyRequestsMessage - waku_service_requests_rejected.inc(labelValues = ["Lightpush"]) else: - waku_service_requests.inc(labelValues = ["Lightpush"]) - let pushRpcRequest = reqDecodeRes.get() requestId = pushRpcRequest.requestId @@ -70,7 +60,7 @@ proc handleRequest*( isSuccess = handleRes.isOk() pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) - if not isSuccess and not isRejectedDueRateLimit: + if not isSuccess: waku_lightpush_errors.inc(labelValues = [pushResponseInfo]) error "failed to push message", error = pushResponseInfo let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo)) @@ -79,10 +69,35 @@ proc handleRequest*( proc initProtocolHandler(wl: WakuLightPush) = proc handle(conn: Connection, proto: string) {.async.} = - let buffer = await conn.readLp(DefaultMaxRpcSize) - let rpc = await handleRequest(wl, conn.peerId, buffer) + var rpc: PushRPC + wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn): + let buffer = await conn.readLp(DefaultMaxRpcSize) + + waku_service_inbound_network_bytes.inc( + amount = buffer.len().int64, labelValues = [WakuLightPushCodec] + ) + + rpc = await handleRequest(wl, conn.peerId, buffer) + do: + debug "lightpush request rejected due rate limit exceeded", + peerId = conn.peerId, limit = $wl.requestRateLimiter + + rpc = static( + PushRPC( + ## We will not copy and decode RPC buffer from stream only for requestId + ## in reject case as it is comparably too expensive and opens possible + ## attack surface + requestId: "N/A", + response: + some(PushResponse(isSuccess: false, info: some(TooManyRequestsMessage))), + ) + ) + await conn.writeLp(rpc.encode().buffer) + ## For lightpush might not worth to measure outgoing trafic as it is only + ## small respones about success/failure + wl.handler = handle wl.codec = WakuLightPushCodec diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 3d02f1e706..357d303b25 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -43,7 +43,7 @@ type WakuStore* = ref object of LPProtocol ## Protocol -proc handleQueryRequest*( +proc handleQueryRequest( self: WakuStore, requestor: PeerId, raw_request: seq[byte] ): Future[seq[byte]] {.async.} = var res = StoreQueryResponse() @@ -59,21 +59,6 @@ proc handleQueryRequest*( let requestId = req.requestId - if self.requestRateLimiter.isSome() and not self.requestRateLimiter.get().tryConsume( - 1 - ): - debug "store query request rejected due rate limit exceeded", - peerId = $requestor, requestId = requestId - - res.statusCode = uint32(ErrorCode.TOO_MANY_REQUESTS) - res.statusDesc = $ErrorCode.TOO_MANY_REQUESTS - - waku_service_requests_rejected.inc(labelValues = ["Store"]) - - return res.encode().buffer - - waku_service_requests.inc(labelValues = ["Store"]) - info "received store query request", peerId = requestor, requestId = requestId, request = req waku_store_queries.inc() @@ -99,15 +84,34 @@ proc handleQueryRequest*( return res.encode().buffer proc initProtocolHandler(self: WakuStore) = + let rejectReposnseBuffer = StoreQueryResponse( + ## We will not copy and decode RPC buffer from stream only for requestId + ## in reject case as it is comparably too expensive and opens possible + ## attack surface + requestId: "N/A", + statusCode: uint32(ErrorCode.TOO_MANY_REQUESTS), + statusDesc: $ErrorCode.TOO_MANY_REQUESTS, + ).encode().buffer + proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let readRes = catch: - await conn.readLp(DefaultMaxRpcSize.int) + var resBuf: seq[byte] + self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn): + let readRes = catch: + await conn.readLp(DefaultMaxRpcSize.int) - let reqBuf = readRes.valueOr: - error "Connection read error", error = error.msg - return + let reqBuf = readRes.valueOr: + error "Connection read error", error = error.msg + return - let resBuf = await self.handleQueryRequest(conn.peerId, reqBuf) + waku_service_inbound_network_bytes.inc( + amount = reqBuf.len().int64, labelValues = [WakuStoreCodec] + ) + + resBuf = await self.handleQueryRequest(conn.peerId, reqBuf) + do: + debug "store query request rejected due rate limit exceeded", + peerId = conn.peerId, limit = $self.requestRateLimiter + resBuf = rejectReposnseBuffer let writeRes = catch: await conn.writeLp(resBuf) @@ -116,6 +120,10 @@ proc initProtocolHandler(self: WakuStore) = error "Connection write error", error = writeRes.error.msg return + waku_service_outbound_network_bytes.inc( + amount = resBuf.len().int64, labelValues = [WakuStoreCodec] + ) + self.handler = handler self.codec = WakuStoreCodec diff --git a/waku/waku_store_legacy/client.nim b/waku/waku_store_legacy/client.nim index 1ad0069e9b..9f72ed3fc4 100644 --- a/waku/waku_store_legacy/client.nim +++ b/waku/waku_store_legacy/client.nim @@ -39,7 +39,7 @@ proc new*( proc sendHistoryQueryRPC( w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo ): Future[HistoryResult] {.async, gcsafe.} = - let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) + let connOpt = await w.peerManager.dialPeer(peer, WakuLegacyStoreCodec) if connOpt.isNone(): waku_legacy_store_errors.inc(labelValues = [dialFailure]) return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer)) @@ -217,7 +217,7 @@ when defined(waku_exp_store_resume): else: debug "no candidate list is provided, selecting a random peer" # if no peerList is set then query from one of the peers stored in the peer manager - let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) + let peerOpt = w.peerManager.selectPeer(WakuLegacyStoreCodec) if peerOpt.isNone(): warn "no suitable remote peers" waku_legacy_store_errors.inc(labelValues = [peerNotFoundFailure]) diff --git a/waku/waku_store_legacy/common.nim b/waku/waku_store_legacy/common.nim index 67af41a68a..0de36fe192 100644 --- a/waku/waku_store_legacy/common.nim +++ b/waku/waku_store_legacy/common.nim @@ -7,7 +7,7 @@ import std/[options, sequtils], stew/results, stew/byteutils, nimcrypto/sha2 import ../waku_core, ../common/paging const - WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" + WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4" DefaultPageSize*: uint64 = 20 diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index c50d8f9388..c0ede7b85f 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -44,77 +44,103 @@ type WakuStore* = ref object of LPProtocol ## Protocol -proc initProtocolHandler(ws: WakuStore) = - proc handler(conn: Connection, proto: string) {.async.} = - let buf = await conn.readLp(DefaultMaxRpcSize.int) - - let decodeRes = HistoryRPC.decode(buf) - if decodeRes.isErr(): - error "failed to decode rpc", peerId = $conn.peerId - waku_legacy_store_errors.inc(labelValues = [decodeRpcFailure]) - # TODO: Return (BAD_REQUEST, cause: "decode rpc failed") - return - - let reqRpc = decodeRes.value - - if reqRpc.query.isNone(): - error "empty query rpc", peerId = $conn.peerId, requestId = reqRpc.requestId - waku_legacy_store_errors.inc(labelValues = [emptyRpcQueryFailure]) - # TODO: Return (BAD_REQUEST, cause: "empty query") - return - - if ws.requestRateLimiter.isSome() and not ws.requestRateLimiter.get().tryConsume(1): - trace "store query request rejected due rate limit exceeded", - peerId = $conn.peerId, requestId = reqRpc.requestId - let error = HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC() - let response = HistoryResponseRPC(error: error) - let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(response)) - await conn.writeLp(rpc.encode().buffer) - waku_service_requests_rejected.inc(labelValues = ["Store"]) - return - - waku_service_requests.inc(labelValues = ["Store"]) +proc handleLegacyQueryRequest( + self: WakuStore, requestor: PeerId, raw_request: seq[byte] +): Future[seq[byte]] {.async.} = + let decodeRes = HistoryRPC.decode(raw_request) + if decodeRes.isErr(): + error "failed to decode rpc", peerId = requestor + waku_legacy_store_errors.inc(labelValues = [decodeRpcFailure]) + # TODO: Return (BAD_REQUEST, cause: "decode rpc failed") + return + + let reqRpc = decodeRes.value + + if reqRpc.query.isNone(): + error "empty query rpc", peerId = requestor, requestId = reqRpc.requestId + waku_legacy_store_errors.inc(labelValues = [emptyRpcQueryFailure]) + # TODO: Return (BAD_REQUEST, cause: "empty query") + return + + let + requestId = reqRpc.requestId + request = reqRpc.query.get().toAPI() + + info "received history query", + peerId = requestor, requestId = requestId, query = request + waku_legacy_store_queries.inc() + + var responseRes: HistoryResult + try: + responseRes = await self.queryHandler(request) + except Exception: + error "history query failed", + peerId = requestor, requestId = requestId, error = getCurrentExceptionMsg() + + let error = HistoryError(kind: HistoryErrorKind.UNKNOWN).toRPC() + let response = HistoryResponseRPC(error: error) + return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer + + if responseRes.isErr(): + error "history query failed", + peerId = requestor, requestId = requestId, error = responseRes.error - let - requestId = reqRpc.requestId - request = reqRpc.query.get().toAPI() - - info "received history query", - peerId = conn.peerId, requestId = requestId, query = request - waku_legacy_store_queries.inc() + let response = responseRes.toRPC() + return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer - var responseRes: HistoryResult - try: - responseRes = await ws.queryHandler(request) - except Exception: - error "history query failed", - peerId = $conn.peerId, requestId = requestId, error = getCurrentExceptionMsg() + let response = responseRes.toRPC() - let error = HistoryError(kind: HistoryErrorKind.UNKNOWN).toRPC() - let response = HistoryResponseRPC(error: error) - let rpc = HistoryRPC(requestId: requestId, response: some(response)) - await conn.writeLp(rpc.encode().buffer) - return + info "sending history response", + peerId = requestor, requestId = requestId, messages = response.messages.len - if responseRes.isErr(): - error "history query failed", - peerId = $conn.peerId, requestId = requestId, error = responseRes.error + return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer - let response = responseRes.toRPC() - let rpc = HistoryRPC(requestId: requestId, response: some(response)) - await conn.writeLp(rpc.encode().buffer) +proc initProtocolHandler(ws: WakuStore) = + let rejectResponseBuf = HistoryRPC( + ## We will not copy and decode RPC buffer from stream only for requestId + ## in reject case as it is comparably too expensive and opens possible + ## attack surface + requestId: "N/A", + response: some( + HistoryResponseRPC( + error: HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC() + ) + ), + ).encode().buffer + + proc handler(conn: Connection, proto: string) {.async, closure.} = + var resBuf: seq[byte] + ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn): + let readRes = catch: + await conn.readLp(DefaultMaxRpcSize.int) + + let reqBuf = readRes.valueOr: + error "Connection read error", error = error.msg + return + + waku_service_inbound_network_bytes.inc( + amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec] + ) + + resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf) + do: + debug "Legacy store query request rejected due rate limit exceeded", + peerId = conn.peerId, limit = $ws.requestRateLimiter + resBuf = rejectResponseBuf + + let writeRes = catch: + await conn.writeLp(resBuf) + + if writeRes.isErr(): + error "Connection write error", error = writeRes.error.msg return - let response = responseRes.toRPC() - - info "sending history response", - peerId = conn.peerId, requestId = requestId, messages = response.messages.len - - let rpc = HistoryRPC(requestId: requestId, response: some(response)) - await conn.writeLp(rpc.encode().buffer) + waku_service_outbound_network_bytes.inc( + amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec] + ) ws.handler = handler - ws.codec = WakuStoreCodec + ws.codec = WakuLegacyStoreCodec proc new*( T: type WakuStore,