Skip to content

Commit

Permalink
Merge branch 'master' into feat-rate-limit-lightpush-store-phase1
Browse files Browse the repository at this point in the history
  • Loading branch information
NagyZoltanPeter authored Apr 15, 2024
2 parents 8e680f6 + 4117fe6 commit 994251a
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 20 deletions.
10 changes: 4 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# BUILD NIM APP ----------------------------------------------------------------

# alpine:edge supports building rust binaries, alpine:3.16 doesn't for some reason
FROM alpine@sha256:3e44438281baf26907675b99c9a4a421c4d4a57c954120327e703aa8329086bd AS nim-build
FROM rust:1.77.1-alpine3.18 AS nim-build

ARG NIMFLAGS
ARG MAKE_TARGET=wakunode2
ARG NIM_COMMIT
ARG LOG_LEVEL=TRACE

# Get build tools and required header files
RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq rust cargo
RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq

WORKDIR /app
COPY . .
Expand All @@ -29,7 +27,7 @@ RUN make -j$(nproc) ${NIM_COMMIT} $MAKE_TARGET LOG_LEVEL=${LOG_LEVEL} NIMFLAGS="

# PRODUCTION IMAGE -------------------------------------------------------------

FROM alpine:3.16 as prod
FROM alpine:3.18 as prod

ARG MAKE_TARGET=wakunode2

Expand Down Expand Up @@ -66,7 +64,7 @@ CMD ["--help"]
# DEBUG IMAGE ------------------------------------------------------------------

# Build debug tools: heaptrack
FROM alpine:3.16 AS heaptrack-build
FROM alpine:3.18 AS heaptrack-build

RUN apk update
RUN apk add -- gdb git g++ make cmake zlib-dev boost-dev libunwind-dev
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const ContentScriptVersion_3* =
"""
CREATE INDEX IF NOT EXISTS i_query ON messages
(contentTopic, pubsubTopic, storedAt, id);
UPDATE version SET version = 3 WHERE version = 2;
"""
3 changes: 2 additions & 1 deletion migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import content_script_version_1, content_script_version_2
import content_script_version_1, content_script_version_2, content_script_version_3

type MigrationScript* = object
version*: int
Expand All @@ -11,6 +11,7 @@ const PgMigrationScripts* =
@[
MigrationScript(version: 1, scriptContent: ContentScriptVersion_1),
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
]

proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-chronicles
3 changes: 2 additions & 1 deletion waku/waku_api/rest/store/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ else:

import
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
import ../../../waku_store/common, ../serdes, ../responses, ./types
import
../../../waku_store/common, ../../../common/base64, ../serdes, ../responses, ./types

export types

Expand Down
4 changes: 2 additions & 2 deletions waku/waku_api/rest/store/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ proc writeValue*(
writer.beginRecord()
writer.writeField("payload", $value.payload)
if value.contentTopic.isSome():
writer.writeField("content_topic", value.contentTopic.get())
writer.writeField("contentTopic", value.contentTopic.get())
if value.version.isSome():
writer.writeField("version", value.version.get())
if value.timestamp.isSome():
Expand Down Expand Up @@ -176,7 +176,7 @@ proc readValue*(
case fieldName
of "payload":
payload = some(reader.readValue(Base64String))
of "content_topic":
of "contentTopic":
contentTopic = some(reader.readValue(ContentTopic))
of "version":
version = some(reader.readValue(uint32))
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
logScope:
topics = "waku archive migration"

const SchemaVersion* = 2 # increase this when there is an update in the database schema
const SchemaVersion* = 3 # increase this when there is an update in the database schema

proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ proc init(db: SqliteDatabase): ArchiveDriverResult[void] =

let resMsgIndex = createHistoryQueryIndex(db)
if resMsgIndex.isErr():
return err("failed to create i_msg index: " & resMsgIndex.error())
return err("failed to create i_query index: " & resMsgIndex.error())

return ok()

Expand Down
42 changes: 35 additions & 7 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,19 @@ proc startListeningToEvents(
let newBlockCallback = g.getNewBlockCallback()
g.runInInterval(newBlockCallback, DefaultBlockPollRate)

proc batchAwaitBlockHandlingFuture(
g: OnchainGroupManager, futs: seq[Future[bool]]
): Future[void] {.async: (raises: [Exception]).} =
for fut in futs:
try:
var handleBlockRes: bool
g.retryWrapper(handleBlockRes, "Failed to handle block"):
await fut
except CatchableError:
raise newException(
CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg()
)

proc startOnchainSync(
g: OnchainGroupManager
): Future[void] {.async: (raises: [Exception]).} =
Expand All @@ -606,6 +619,10 @@ proc startOnchainSync(

# static block chunk size
let blockChunkSize = 2_000
# delay between rpc calls to not overload the rate limit
let rpcDelay = 200.milliseconds
# max number of futures to run concurrently
let maxFutures = 10

var fromBlock =
if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
Expand All @@ -616,25 +633,36 @@ proc startOnchainSync(
blockNumber = g.rlnContractDeployedBlockNumber
g.rlnContractDeployedBlockNumber

var futs = newSeq[Future[bool]]()
var currentLatestBlock: BlockNumber
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())

try:
# we always want to sync from last processed block => latest
# chunk events
while true:
var currentLatestBlock: BlockNumber
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
# if the fromBlock is less than 2k blocks behind the current block
# then fetch the new toBlock
if fromBlock >= currentLatestBlock:
break

if fromBlock + blockChunkSize.uint > currentLatestBlock.uint:
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())


let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
var handleBlockRes: bool
g.retryWrapper(handleBlockRes, "Failed to handle old blocks"):
await g.getAndHandleEvents(fromBlock, toBlock)
await sleepAsync(rpcDelay)
futs.add(g.getAndHandleEvents(fromBlock, toBlock))
if futs.len >= maxFutures or toBlock == currentLatestBlock:
await g.batchAwaitBlockHandlingFuture(futs)
futs = newSeq[Future[bool]]()
fromBlock = toBlock + 1
except CatchableError:
raise newException(
ValueError,
CatchableError,
"failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg(),
)

Expand Down

0 comments on commit 994251a

Please sign in to comment.