diff --git a/CHANGELOG.md b/CHANGELOG.md index b7a9469180..0cd2b70c0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,66 @@ +## v0.33.0 (2024-09-30) + +#### Notes: + +* The `--pubsub-topic` CLI configuration has been deprecated and support for it will be removed on release v0.35.0. In order to migrate, please use the `--shard` configuration instead. For example, instead of `--pubsub-topic=/waku/2/rs//`, use `--cluster-id=` once and `--shard=` for each subscribed shard +* The `--rest-private` CLI configuration has been removed. Please delete any reference to it when running your nodes +* Introduced the `--reliability` CLI configuration, activating the new experimental StoreV3 message confirmation protocol +* DOS protection configurations of non-relay, req/resp protocols are changed + * `--request-rate-limit` and `--request-rate-period` options are no longer supported. + * `--rate-limit` CLI configuration is now available. + - The new flag can describe various rate-limit requirements for each protocol supported. The setting can be repeated, each instance can define exactly one rate-limit option. + - Format is `:volume/period` + - If protocol is not given, settings will be taken as default for un-set protocols. Ex: 80/2s + - Supported protocols are: lightpush|filter|px|store|storev2|storev3 + - `volume` must be an integer value, representing number of requests over the period of time allowed. + - `period ` must be an integer with defined unit as one of h|m|s|ms + - If not set, no rate limit will be applied to request/response protocols, except for the filter protocol. + + +### Release highlights + +* a new experimental reliability protocol has been implemented, leveraging StoreV3 to confirm message delivery +* Peer Exchange protocol can now be protected by rate-limit boundary checks. +* Fine-grained configuration of DOS protection is available with this release. See, "Notes" above. + +### Bug Fixes + +- rejecting excess relay connections ([#3063](https://github.com/waku-org/nwaku/issues/3063)) ([8b0884c7](https://github.com/waku-org/nwaku/commit/8b0884c7)) +- make Peer Exchange's rpc status_code optional for backward compatibility ([#3059](https://github.com/waku-org/nwaku/pull/3059)) ([5afa9b13](https://github.com/waku-org/nwaku/commit/5afa9b13)) +- px protocol decode - do not treat missing response field as error ([#3054](https://github.com/waku-org/nwaku/issues/3054)) ([9b445ac4](https://github.com/waku-org/nwaku/commit/9b445ac4)) +- setting up node with modified config ([#3036](https://github.com/waku-org/nwaku/issues/3036)) ([8f289925](https://github.com/waku-org/nwaku/commit/8f289925)) +- get back health check for postgres legacy ([#3010](https://github.com/waku-org/nwaku/issues/3010)) ([5a0edff7](https://github.com/waku-org/nwaku/commit/5a0edff7)) +- libnegentropy integration ([#2996](https://github.com/waku-org/nwaku/issues/2996)) ([c3cb06ac](https://github.com/waku-org/nwaku/commit/c3cb06ac)) +- peer-exchange issue ([#2889](https://github.com/waku-org/nwaku/issues/2889)) ([43157102](https://github.com/waku-org/nwaku/commit/43157102)) + +### Changes + +- append current version in agentString which is used by the identify protocol ([#3057](https://github.com/waku-org/nwaku/pull/3057)) ([368bb3c1](https://github.com/waku-org/nwaku/commit/368bb3c1)) +- rate limit peer exchange protocol, enhanced response status in RPC ([#3035](https://github.com/waku-org/nwaku/issues/3035)) ([0a7f16a3](https://github.com/waku-org/nwaku/commit/0a7f16a3)) +- Switch libnegentropy library build from shared to static linkage ([#3041](https://github.com/waku-org/nwaku/issues/3041)) ([83f25c3e](https://github.com/waku-org/nwaku/commit/83f25c3e)) +- libwaku reduce repetitive code by adding a template handling resp returns ([#3032](https://github.com/waku-org/nwaku/issues/3032)) ([1713f562](https://github.com/waku-org/nwaku/commit/1713f562)) +- libwaku - extending the library with peer_manager and peer_exchange features ([#3026](https://github.com/waku-org/nwaku/issues/3026)) ([5ea1cf0c](https://github.com/waku-org/nwaku/commit/5ea1cf0c)) +- use submodule nph in CI to check lint ([#3027](https://github.com/waku-org/nwaku/issues/3027)) ([ce9a8c46](https://github.com/waku-org/nwaku/commit/ce9a8c46)) +- deprecating pubsub topic ([#2997](https://github.com/waku-org/nwaku/issues/2997)) ([a3cd2a1a](https://github.com/waku-org/nwaku/commit/a3cd2a1a)) +- lightpush - error metric less variable by only setting a fixed string ([#3020](https://github.com/waku-org/nwaku/issues/3020)) ([d3e6717a](https://github.com/waku-org/nwaku/commit/d3e6717a)) +- enhance libpq management ([#3015](https://github.com/waku-org/nwaku/issues/3015)) ([45319f09](https://github.com/waku-org/nwaku/commit/45319f09)) +- per limit split of PostgreSQL queries ([#3008](https://github.com/waku-org/nwaku/issues/3008)) ([e1e05afb](https://github.com/waku-org/nwaku/commit/e1e05afb)) +- Added metrics to liteprotocoltester ([#3002](https://github.com/waku-org/nwaku/issues/3002)) ([8baf627f](https://github.com/waku-org/nwaku/commit/8baf627f)) +- extending store metrics ([#2995](https://github.com/waku-org/nwaku/issues/2995)) ([fd83b42f](https://github.com/waku-org/nwaku/commit/fd83b42f)) +- Better timing and requestId detail for slower store db queries ([#2994](https://github.com/waku-org/nwaku/issues/2994)) ([e8a49b76](https://github.com/waku-org/nwaku/commit/e8a49b76)) +- remove unused setting from external_config.nim ([#3004](https://github.com/waku-org/nwaku/issues/3004)) ([fd84363e](https://github.com/waku-org/nwaku/commit/fd84363e)) +- delivery monitor for store v3 reliability protocol ([#2977](https://github.com/waku-org/nwaku/issues/2977)) ([0f68274c](https://github.com/waku-org/nwaku/commit/0f68274c)) + +This release supports the following [libp2p protocols](https://docs.libp2p.io/concepts/protocols/): +| Protocol | Spec status | Protocol id | +| ---: | :---: | :--- | +| [`11/WAKU2-RELAY`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/11/relay.md) | `stable` | `/vac/waku/relay/2.0.0` | +| [`12/WAKU2-FILTER`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/12/filter.md) | `draft` | `/vac/waku/filter/2.0.0-beta1`
`/vac/waku/filter-subscribe/2.0.0-beta1`
`/vac/waku/filter-push/2.0.0-beta1` | +| [`13/WAKU2-STORE`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/13/store.md) | `draft` | `/vac/waku/store/2.0.0-beta4` | +| [`19/WAKU2-LIGHTPUSH`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` | +| [`66/WAKU2-METADATA`](https://github.com/waku-org/specs/blob/master/standards/core/metadata.md) | `raw` | `/vac/waku/metadata/1.0.0` | +| [`WAKU-SYNC`](https://github.com/waku-org/specs/blob/feat--waku-sync/standards/core/sync.md) | `draft` | `/vac/waku/sync/1.0.0` | + ## v0.32.0 (2024-08-30) #### Notes: diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 7b808c14d8..34a4286152 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -34,16 +34,15 @@ suite "Postgres driver": var futures = newSeq[Future[ArchiveDriverResult[void]]](0) let beforeSleep = now() - for _ in 1 .. 100: + + for _ in 1 .. 25: futures.add(driver.sleep(1)) await allFutures(futures) let diff = now() - beforeSleep - # Actually, the diff randomly goes between 1 and 2 seconds. - # although in theory it should spend 1s because we establish 100 - # connections and we spawn 100 tasks that spend ~1s each. - assert diff < 20_000_000_000 + + assert diff < 2_000_000_000 ## nanoseconds asyncTest "Insert a message": const contentTopic = "test-content-topic" diff --git a/vendor/negentropy b/vendor/negentropy index 3c2df0b899..13243f668e 160000 --- a/vendor/negentropy +++ b/vendor/negentropy @@ -1 +1 @@ -Subproject commit 3c2df0b899bae1213d27563e44c5a0d610d8aada +Subproject commit 13243f668edb85ef4b660e40833d81435501325f diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index bc5da4ee6c..aae8d80bc6 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -1,7 +1,8 @@ import - std/[times, strutils, asyncnet, os, sequtils], + std/[times, strutils, asyncnet, os, sequtils, sets], results, chronos, + chronos/threadsync, metrics, re, chronicles @@ -11,9 +12,36 @@ include db_connector/db_postgres type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].} +type DbConnWrapper* = ref object + dbConn: DbConn + open: bool + preparedStmts: HashSet[string] ## [stmtName's] + futBecomeFree*: Future[void] + ## to notify the pgasyncpool that this conn is free, i.e. not busy + ## Connection management -proc check*(db: DbConn): Result[void, string] = +proc containsPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string): bool = + return dbConnWrapper.preparedStmts.contains(preparedStmt) + +proc inclPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string) = + dbConnWrapper.preparedStmts.incl(preparedStmt) + +proc getDbConn*(dbConnWrapper: DbConnWrapper): DbConn = + return dbConnWrapper.dbConn + +proc isPgDbConnBusy*(dbConnWrapper: DbConnWrapper): bool = + if isNil(dbConnWrapper.futBecomeFree): + return false + return not dbConnWrapper.futBecomeFree.finished() + +proc isPgDbConnOpen*(dbConnWrapper: DbConnWrapper): bool = + return dbConnWrapper.open + +proc setPgDbConnOpen*(dbConnWrapper: DbConnWrapper, newOpenState: bool) = + dbConnWrapper.open = newOpenState + +proc check(db: DbConn): Result[void, string] = var message: string try: message = $db.pqErrorMessage() @@ -25,11 +53,11 @@ proc check*(db: DbConn): Result[void, string] = return ok() -proc open*(connString: string): Result[DbConn, string] = +proc openDbConn(connString: string): Result[DbConn, string] = ## Opens a new connection. var conn: DbConn = nil try: - conn = open("", "", "", connString) + conn = open("", "", "", connString) ## included from db_postgres module except DbError: return err("exception opening new connection: " & getCurrentExceptionMsg()) @@ -46,22 +74,35 @@ proc open*(connString: string): Result[DbConn, string] = return ok(conn) -proc closeDbConn*(db: DbConn) {.raises: [OSError].} = - let fd = db.pqsocket() - if fd != -1: - asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) - db.close() +proc new*(T: type DbConnWrapper, connString: string): Result[T, string] = + let dbConn = openDbConn(connString).valueOr: + return err("failed to establish a new connection: " & $error) + + return ok(DbConnWrapper(dbConn: dbConn, open: true)) + +proc closeDbConn*( + dbConnWrapper: DbConnWrapper +): Result[void, string] {.raises: [OSError].} = + let fd = dbConnWrapper.dbConn.pqsocket() + if fd == -1: + return err("error file descriptor -1 in closeDbConn") + + asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) + + dbConnWrapper.dbConn.close() + + return ok() proc `$`(self: SqlQuery): string = return cast[string](self) proc sendQuery( - db: DbConn, query: SqlQuery, args: seq[string] + dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string] ): Future[Result[void, string]] {.async.} = ## This proc can be used directly for queries that don't retrieve values back. - if db.status != CONNECTION_OK: - db.check().isOkOr: + if dbConnWrapper.dbConn.status != CONNECTION_OK: + dbConnWrapper.dbConn.check().isOkOr: return err("failed to connect to database: " & $error) return err("unknown reason") @@ -72,17 +113,16 @@ proc sendQuery( except DbError: return err("exception formatting the query: " & getCurrentExceptionMsg()) - let success = db.pqsendQuery(cstring(wellFormedQuery)) + let success = dbConnWrapper.dbConn.pqsendQuery(cstring(wellFormedQuery)) if success != 1: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: return err("failed pqsendQuery: " & $error) - return err("failed pqsendQuery: unknown reason") return ok() proc sendQueryPrepared( - db: DbConn, + dbConnWrapper: DbConnWrapper, stmtName: string, paramValues: openArray[string], paramLengths: openArray[int32], @@ -96,8 +136,8 @@ proc sendQueryPrepared( $paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg) - if db.status != CONNECTION_OK: - db.check().isOkOr: + if dbConnWrapper.dbConn.status != CONNECTION_OK: + dbConnWrapper.dbConn.check().isOkOr: return err("failed to connect to database: " & $error) return err("unknown reason") @@ -110,7 +150,7 @@ proc sendQueryPrepared( const ResultFormat = 0 ## 0 for text format, 1 for binary format. - let success = db.pqsendQueryPrepared( + let success = dbConnWrapper.dbConn.pqsendQueryPrepared( stmtName, nParams, cstrArrayParams, @@ -119,7 +159,7 @@ proc sendQueryPrepared( ResultFormat, ) if success != 1: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: return err("failed pqsendQueryPrepared: " & $error) return err("failed pqsendQueryPrepared: unknown reason") @@ -127,32 +167,40 @@ proc sendQueryPrepared( return ok() proc waitQueryToFinish( - db: DbConn, rowCallback: DataProc = nil + dbConnWrapper: DbConnWrapper, rowCallback: DataProc = nil ): Future[Result[void, string]] {.async.} = ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - var dataAvailable = false + let futDataAvailable = newFuture[void]("futDataAvailable") + proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = - dataAvailable = true + if not futDataAvailable.completed(): + futDataAvailable.complete() - let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db)) + let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn)) asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("failed to add event reader in waitQueryToFinish: " & $error) + defer: + asyncengine.removeReader2(asyncFd).isOkOr: + return err("failed to remove event reader in waitQueryToFinish: " & $error) - while not dataAvailable: - await sleepAsync(timer.milliseconds(1)) + await futDataAvailable - ## Now retrieve the result + ## Now retrieve the result from the database while true: - let pqResult = db.pqgetResult() + let pqResult = dbConnWrapper.dbConn.pqgetResult() if pqResult == nil: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: + if not dbConnWrapper.futBecomeFree.failed(): + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in query: " & $error) - return ok() # reached the end of the results + dbConnWrapper.futBecomeFree.complete() + return ok() # reached the end of the results. The query is completed if not rowCallback.isNil(): rowCallback(pqResult) @@ -160,8 +208,14 @@ proc waitQueryToFinish( pqclear(pqResult) proc dbConnQuery*( - db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc + dbConnWrapper: DbConnWrapper, + query: SqlQuery, + args: seq[string], + rowCallback: DataProc, + requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = + dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery") + let cleanedQuery = ($query).replace(" ", "").replace("\n", "") ## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "") @@ -170,7 +224,9 @@ proc dbConnQuery*( var queryStartTime = getTime().toUnixFloat() - (await db.sendQuery(query, args)).isOkOr: + (await dbConnWrapper.sendQuery(query, args)).isOkOr: + error "error in dbConnQuery", error = $error + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in dbConnQuery calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime @@ -178,7 +234,7 @@ proc dbConnQuery*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback)).isOkOr: + (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQuery calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime @@ -188,6 +244,7 @@ proc dbConnQuery*( if "insert" notin ($query).toLower(): debug "dbConnQuery", + requestId, query = $query, querySummary, waitDurationSecs = waitDuration, @@ -196,15 +253,20 @@ proc dbConnQuery*( return ok() proc dbConnQueryPrepared*( - db: DbConn, + dbConnWrapper: DbConnWrapper, stmtName: string, paramValues: seq[string], paramLengths: seq[int32], paramFormats: seq[int32], rowCallback: DataProc, + requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = + dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQueryPrepared") var queryStartTime = getTime().toUnixFloat() - db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: + + dbConnWrapper.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) + error "error in dbConnQueryPrepared", error = $error return err("error in dbConnQueryPrepared calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime @@ -212,7 +274,7 @@ proc dbConnQueryPrepared*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback)).isOkOr: + (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime @@ -222,6 +284,9 @@ proc dbConnQueryPrepared*( if "insert" notin stmtName.toLower(): debug "dbConnQueryPrepared", - stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration + requestId, + stmtName, + waitDurationSecs = waitDuration, + sendDurationSecs = sendDuration return ok() diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 66e66bd2ff..10e70eb515 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -2,28 +2,22 @@ # Inspired by: https://github.com/treeform/pg/ {.push raises: [].} -import std/[sequtils, nre, strformat, sets], results, chronos, chronicles +import + std/[sequtils, nre, strformat], + results, + chronos, + chronos/threadsync, + chronicles, + strutils import ./dbconn, ../common, ../../../waku_core/time -type PgAsyncPoolState {.pure.} = enum - Closed - Live - Closing - -type PgDbConn = ref object - dbConn: DbConn - open: bool - busy: bool - preparedStmts: HashSet[string] ## [stmtName's] - type # Database connection pool PgAsyncPool* = ref object connString: string maxConnections: int - - state: PgAsyncPoolState - conns: seq[PgDbConn] + conns: seq[DbConnWrapper] + busySignal: ThreadSignalPtr ## signal to wait while the pool is busy proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] = var connString: string @@ -45,85 +39,61 @@ proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResu let pool = PgAsyncPool( connString: connString, maxConnections: maxConnections, - state: PgAsyncPoolState.Live, - conns: newSeq[PgDbConn](0), + conns: newSeq[DbConnWrapper](0), ) return ok(pool) -func isLive(pool: PgAsyncPool): bool = - pool.state == PgAsyncPoolState.Live - func isBusy(pool: PgAsyncPool): bool = - pool.conns.mapIt(it.busy).allIt(it) + return pool.conns.mapIt(it.isPgDbConnBusy()).allIt(it) proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} = ## Gracefully wait and close all openned connections - - if pool.state == PgAsyncPoolState.Closing: - while pool.state == PgAsyncPoolState.Closing: - await sleepAsync(0.milliseconds) # Do not block the async runtime - return ok() - - pool.state = PgAsyncPoolState.Closing - # wait for the connections to be released and close them, without # blocking the async runtime - while pool.conns.anyIt(it.busy): - await sleepAsync(0.milliseconds) - for i in 0 ..< pool.conns.len: - if pool.conns[i].busy: - continue + debug "close PgAsyncPool" + await allFutures(pool.conns.mapIt(it.futBecomeFree)) + debug "closing all connection PgAsyncPool" for i in 0 ..< pool.conns.len: - if pool.conns[i].open: - pool.conns[i].dbConn.closeDbConn() - pool.conns[i].busy = false - pool.conns[i].open = false + if pool.conns[i].isPgDbConnOpen(): + pool.conns[i].closeDbConn().isOkOr: + return err("error in close PgAsyncPool: " & $error) + pool.conns[i].setPgDbConnOpen(false) pool.conns.setLen(0) - pool.state = PgAsyncPoolState.Closed return ok() proc getFirstFreeConnIndex(pool: PgAsyncPool): DatabaseResult[int] = for index in 0 ..< pool.conns.len: - if pool.conns[index].busy: + if pool.conns[index].isPgDbConnBusy(): continue ## Pick up the first free connection and set it busy - pool.conns[index].busy = true return ok(index) proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} = ## Waits for a free connection or create if max connections limits have not been reached. ## Returns the index of the free connection - if not pool.isLive(): - return err("pool is not live") - if not pool.isBusy(): return pool.getFirstFreeConnIndex() ## Pool is busy then - if pool.conns.len == pool.maxConnections: ## Can't create more connections. Wait for a free connection without blocking the async runtime. - while pool.isBusy(): - await sleepAsync(0.milliseconds) + let busyFuts = pool.conns.mapIt(it.futBecomeFree) + discard await one(busyFuts) return pool.getFirstFreeConnIndex() elif pool.conns.len < pool.maxConnections: ## stablish a new connection - let conn = dbconn.open(pool.connString).valueOr: - return err("failed to stablish a new connection: " & $error) + let dbConn = DbConnWrapper.new(pool.connString).valueOr: + return err("error creating DbConnWrapper: " & $error) - pool.conns.add( - PgDbConn( - dbConn: conn, open: true, busy: true, preparedStmts: initHashSet[string]() - ) - ) + pool.conns.add(dbConn) return ok(pool.conns.len - 1) proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = @@ -131,22 +101,12 @@ proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = ## This proc is intended to be called when the connection with the database ## got interrupted from the database side or a connectivity problem happened. - for i in 0 ..< pool.conns.len: - pool.conns[i].busy = false - (await pool.close()).isOkOr: return err("error in resetConnPool: " & error) - pool.state = PgAsyncPoolState.Live return ok() -proc releaseConn(pool: PgAsyncPool, conn: DbConn) = - ## Marks the connection as released. - for i in 0 ..< pool.conns.len: - if pool.conns[i].dbConn == conn: - pool.conns[i].busy = false - -const SlowQueryThresholdInNanoSeconds = 2_000_000_000 +const SlowQueryThreshold = 1.seconds proc pgQuery*( pool: PgAsyncPool, @@ -159,15 +119,14 @@ proc pgQuery*( return err("connRes.isErr in query: " & $error) let queryStartTime = getNowInNanosecondTime() - let conn = pool.conns[connIndex].dbConn + let dbConnWrapper = pool.conns[connIndex] defer: - pool.releaseConn(conn) let queryDuration = getNowInNanosecondTime() - queryStartTime - if queryDuration > SlowQueryThresholdInNanoSeconds: + if queryDuration > SlowQueryThreshold.nanos: debug "pgQuery slow query", query_duration_secs = (queryDuration / 1_000_000_000), query, requestId - (await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr: + (await dbConnWrapper.dbConnQuery(sql(query), args, rowCallback, requestId)).isOkOr: return err("error in asyncpool query: " & $error) return ok() @@ -192,33 +151,32 @@ proc runStmt*( let connIndex = (await pool.getConnIndex()).valueOr: return err("Error in runStmt: " & $error) - let conn = pool.conns[connIndex].dbConn + let dbConnWrapper = pool.conns[connIndex] let queryStartTime = getNowInNanosecondTime() defer: - pool.releaseConn(conn) let queryDuration = getNowInNanosecondTime() - queryStartTime - if queryDuration > SlowQueryThresholdInNanoSeconds: + if queryDuration > SlowQueryThreshold.nanos: debug "runStmt slow query", query_duration = queryDuration / 1_000_000_000, query = stmtDefinition, requestId - if not pool.conns[connIndex].preparedStmts.contains(stmtName): + if not pool.conns[connIndex].containsPreparedStmt(stmtName): # The connection doesn't have that statement yet. Let's create it. # Each session/connection has its own prepared statements. let res = catch: let len = paramValues.len - discard conn.prepare(stmtName, sql(stmtDefinition), len) + discard dbConnWrapper.getDbConn().prepare(stmtName, sql(stmtDefinition), len) if res.isErr(): return err("failed prepare in runStmt: " & res.error.msg) - pool.conns[connIndex].preparedStmts.incl(stmtName) + pool.conns[connIndex].inclPreparedStmt(stmtName) ( - await conn.dbConnQueryPrepared( - stmtName, paramValues, paramLengths, paramFormats, rowCallback + await dbConnWrapper.dbConnQueryPrepared( + stmtName, paramValues, paramLengths, paramFormats, rowCallback, requestId ) ).isOkOr: return err("error in runStmt: " & $error) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 5aa78203df..10151ee15f 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -336,8 +336,6 @@ proc subscribe*( error "Invalid API call to `subscribe`. Was already subscribed" return - debug "subscribe", pubsubTopic = pubsubTopic - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) node.registerRelayDefaultHandler(pubsubTopic) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index a69fdb9f46..bfdffa6bf2 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -123,9 +123,9 @@ const SelectWithCursorNoDataAscStmtDef = timestamp <= $7 ORDER BY timestamp ASC, messageHash ASC LIMIT $8;""" -const SelectCursorByHashName = "SelectMessageByHash" +const SelectCursorByHashName = "SelectMessageByHashInMessagesLookup" const SelectCursorByHashDef = - """SELECT timestamp FROM messages + """SELECT timestamp FROM messages_lookup WHERE messageHash = $1""" const diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index 9462533780..27bb7bfb43 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -20,9 +20,10 @@ declarePublicGauge waku_px_peers_received_total, "number of ENRs received via peer exchange" declarePublicGauge waku_px_peers_received_unknown, "number of previously unknown ENRs received via peer exchange" -declarePublicGauge waku_px_peers_sent, "number of ENRs sent to peer exchange requesters" +declarePublicCounter waku_px_peers_sent, + "number of ENRs sent to peer exchange requesters" declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached" -declarePublicGauge waku_px_errors, "number of peer exchange errors", ["type"] +declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"] logScope: topics = "waku peer_exchange" @@ -217,7 +218,9 @@ proc populateEnrCache(wpx: WakuPeerExchange) = proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} = # try more aggressively to fill the cache at startup - while wpx.enrCache.len < MaxPeersCacheSize: + var attempts = 10 + while wpx.enrCache.len < MaxPeersCacheSize and attempts > 0: + attempts -= 1 wpx.populateEnrCache() await sleepAsync(5.seconds) diff --git a/waku/waku_peer_exchange/rpc_codec.nim b/waku/waku_peer_exchange/rpc_codec.nim index e5e982938f..b698ffe9f5 100644 --- a/waku/waku_peer_exchange/rpc_codec.nim +++ b/waku/waku_peer_exchange/rpc_codec.nim @@ -71,7 +71,11 @@ proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtobufResult[T] if ?pb.getField(10, status_code): rpc.status_code = PeerExchangeResponseStatusCode.parse(status_code) else: - return err(ProtobufError.missingRequiredField("status_code")) + # older peers may not support status_code field yet + if rpc.peerInfos.len() > 0: + rpc.status_code = PeerExchangeResponseStatusCode.SUCCESS + else: + rpc.status_code = PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE var status_desc: string if ?pb.getField(11, status_desc): @@ -103,8 +107,9 @@ proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtobufResult[T] = var responseBuffer: seq[byte] if not ?pb.getField(2, responseBuffer): - return err(ProtobufError.missingRequiredField("response")) - - rpc.response = ?PeerExchangeResponse.decode(responseBuffer) + rpc.response = + PeerExchangeResponse(status_code: PeerExchangeResponseStatusCode.UNKNOWN) + else: + rpc.response = ?PeerExchangeResponse.decode(responseBuffer) ok(rpc)