Skip to content

Commit

Permalink
Merge branch 'master' into chore-debug-excess-connections
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Oct 2, 2024
2 parents 5d40f73 + e406673 commit f1b12ca
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 131 deletions.
63 changes: 63 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,66 @@
## v0.33.0 (2024-09-30)

#### Notes:

* The `--pubsub-topic` CLI configuration has been deprecated and support for it will be removed on release v0.35.0. In order to migrate, please use the `--shard` configuration instead. For example, instead of `--pubsub-topic=/waku/2/rs/<CLUSTER_ID>/<SHARD_ID>`, use `--cluster-id=<CLUSTER_ID>` once and `--shard=<SHARD_ID>` for each subscribed shard
* The `--rest-private` CLI configuration has been removed. Please delete any reference to it when running your nodes
* Introduced the `--reliability` CLI configuration, activating the new experimental StoreV3 message confirmation protocol
* DOS protection configurations of non-relay, req/resp protocols are changed
* `--request-rate-limit` and `--request-rate-period` options are no longer supported.
* `--rate-limit` CLI configuration is now available.
- The new flag can describe various rate-limit requirements for each protocol supported. The setting can be repeated, each instance can define exactly one rate-limit option.
- Format is `<protocol>:volume/period<time-unit>`
- If protocol is not given, settings will be taken as default for un-set protocols. Ex: 80/2s
- Supported protocols are: lightpush|filter|px|store|storev2|storev3
- `volume` must be an integer value, representing number of requests over the period of time allowed.
- `period <time-unit>` must be an integer with defined unit as one of h|m|s|ms
- If not set, no rate limit will be applied to request/response protocols, except for the filter protocol.


### Release highlights

* a new experimental reliability protocol has been implemented, leveraging StoreV3 to confirm message delivery
* Peer Exchange protocol can now be protected by rate-limit boundary checks.
* Fine-grained configuration of DOS protection is available with this release. See, "Notes" above.

### Bug Fixes

- rejecting excess relay connections ([#3063](https://github.com/waku-org/nwaku/issues/3063)) ([8b0884c7](https://github.com/waku-org/nwaku/commit/8b0884c7))
- make Peer Exchange's rpc status_code optional for backward compatibility ([#3059](https://github.com/waku-org/nwaku/pull/3059)) ([5afa9b13](https://github.com/waku-org/nwaku/commit/5afa9b13))
- px protocol decode - do not treat missing response field as error ([#3054](https://github.com/waku-org/nwaku/issues/3054)) ([9b445ac4](https://github.com/waku-org/nwaku/commit/9b445ac4))
- setting up node with modified config ([#3036](https://github.com/waku-org/nwaku/issues/3036)) ([8f289925](https://github.com/waku-org/nwaku/commit/8f289925))
- get back health check for postgres legacy ([#3010](https://github.com/waku-org/nwaku/issues/3010)) ([5a0edff7](https://github.com/waku-org/nwaku/commit/5a0edff7))
- libnegentropy integration ([#2996](https://github.com/waku-org/nwaku/issues/2996)) ([c3cb06ac](https://github.com/waku-org/nwaku/commit/c3cb06ac))
- peer-exchange issue ([#2889](https://github.com/waku-org/nwaku/issues/2889)) ([43157102](https://github.com/waku-org/nwaku/commit/43157102))

### Changes

- append current version in agentString which is used by the identify protocol ([#3057](https://github.com/waku-org/nwaku/pull/3057)) ([368bb3c1](https://github.com/waku-org/nwaku/commit/368bb3c1))
- rate limit peer exchange protocol, enhanced response status in RPC ([#3035](https://github.com/waku-org/nwaku/issues/3035)) ([0a7f16a3](https://github.com/waku-org/nwaku/commit/0a7f16a3))
- Switch libnegentropy library build from shared to static linkage ([#3041](https://github.com/waku-org/nwaku/issues/3041)) ([83f25c3e](https://github.com/waku-org/nwaku/commit/83f25c3e))
- libwaku reduce repetitive code by adding a template handling resp returns ([#3032](https://github.com/waku-org/nwaku/issues/3032)) ([1713f562](https://github.com/waku-org/nwaku/commit/1713f562))
- libwaku - extending the library with peer_manager and peer_exchange features ([#3026](https://github.com/waku-org/nwaku/issues/3026)) ([5ea1cf0c](https://github.com/waku-org/nwaku/commit/5ea1cf0c))
- use submodule nph in CI to check lint ([#3027](https://github.com/waku-org/nwaku/issues/3027)) ([ce9a8c46](https://github.com/waku-org/nwaku/commit/ce9a8c46))
- deprecating pubsub topic ([#2997](https://github.com/waku-org/nwaku/issues/2997)) ([a3cd2a1a](https://github.com/waku-org/nwaku/commit/a3cd2a1a))
- lightpush - error metric less variable by only setting a fixed string ([#3020](https://github.com/waku-org/nwaku/issues/3020)) ([d3e6717a](https://github.com/waku-org/nwaku/commit/d3e6717a))
- enhance libpq management ([#3015](https://github.com/waku-org/nwaku/issues/3015)) ([45319f09](https://github.com/waku-org/nwaku/commit/45319f09))
- per limit split of PostgreSQL queries ([#3008](https://github.com/waku-org/nwaku/issues/3008)) ([e1e05afb](https://github.com/waku-org/nwaku/commit/e1e05afb))
- Added metrics to liteprotocoltester ([#3002](https://github.com/waku-org/nwaku/issues/3002)) ([8baf627f](https://github.com/waku-org/nwaku/commit/8baf627f))
- extending store metrics ([#2995](https://github.com/waku-org/nwaku/issues/2995)) ([fd83b42f](https://github.com/waku-org/nwaku/commit/fd83b42f))
- Better timing and requestId detail for slower store db queries ([#2994](https://github.com/waku-org/nwaku/issues/2994)) ([e8a49b76](https://github.com/waku-org/nwaku/commit/e8a49b76))
- remove unused setting from external_config.nim ([#3004](https://github.com/waku-org/nwaku/issues/3004)) ([fd84363e](https://github.com/waku-org/nwaku/commit/fd84363e))
- delivery monitor for store v3 reliability protocol ([#2977](https://github.com/waku-org/nwaku/issues/2977)) ([0f68274c](https://github.com/waku-org/nwaku/commit/0f68274c))

This release supports the following [libp2p protocols](https://docs.libp2p.io/concepts/protocols/):
| Protocol | Spec status | Protocol id |
| ---: | :---: | :--- |
| [`11/WAKU2-RELAY`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/11/relay.md) | `stable` | `/vac/waku/relay/2.0.0` |
| [`12/WAKU2-FILTER`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/12/filter.md) | `draft` | `/vac/waku/filter/2.0.0-beta1` <br />`/vac/waku/filter-subscribe/2.0.0-beta1` <br />`/vac/waku/filter-push/2.0.0-beta1` |
| [`13/WAKU2-STORE`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/13/store.md) | `draft` | `/vac/waku/store/2.0.0-beta4` |
| [`19/WAKU2-LIGHTPUSH`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` |
| [`66/WAKU2-METADATA`](https://github.com/waku-org/specs/blob/master/standards/core/metadata.md) | `raw` | `/vac/waku/metadata/1.0.0` |
| [`WAKU-SYNC`](https://github.com/waku-org/specs/blob/feat--waku-sync/standards/core/sync.md) | `draft` | `/vac/waku/sync/1.0.0` |

## v0.32.0 (2024-08-30)

#### Notes:
Expand Down
9 changes: 4 additions & 5 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@ suite "Postgres driver":
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)

let beforeSleep = now()
for _ in 1 .. 100:

for _ in 1 .. 25:
futures.add(driver.sleep(1))

await allFutures(futures)

let diff = now() - beforeSleep
# Actually, the diff randomly goes between 1 and 2 seconds.
# although in theory it should spend 1s because we establish 100
# connections and we spawn 100 tasks that spend ~1s each.
assert diff < 20_000_000_000

assert diff < 2_000_000_000 ## nanoseconds

asyncTest "Insert a message":
const contentTopic = "test-content-topic"
Expand Down
2 changes: 1 addition & 1 deletion vendor/negentropy
Submodule negentropy updated 1 files
+1 −1 cpp/Makefile
139 changes: 102 additions & 37 deletions waku/common/databases/db_postgres/dbconn.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import
std/[times, strutils, asyncnet, os, sequtils],
std/[times, strutils, asyncnet, os, sequtils, sets],
results,
chronos,
chronos/threadsync,
metrics,
re,
chronicles
Expand All @@ -11,9 +12,36 @@ include db_connector/db_postgres

type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].}

type DbConnWrapper* = ref object
dbConn: DbConn
open: bool
preparedStmts: HashSet[string] ## [stmtName's]
futBecomeFree*: Future[void]
## to notify the pgasyncpool that this conn is free, i.e. not busy

## Connection management

proc check*(db: DbConn): Result[void, string] =
proc containsPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string): bool =
return dbConnWrapper.preparedStmts.contains(preparedStmt)

proc inclPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string) =
dbConnWrapper.preparedStmts.incl(preparedStmt)

proc getDbConn*(dbConnWrapper: DbConnWrapper): DbConn =
return dbConnWrapper.dbConn

proc isPgDbConnBusy*(dbConnWrapper: DbConnWrapper): bool =
if isNil(dbConnWrapper.futBecomeFree):
return false
return not dbConnWrapper.futBecomeFree.finished()

proc isPgDbConnOpen*(dbConnWrapper: DbConnWrapper): bool =
return dbConnWrapper.open

proc setPgDbConnOpen*(dbConnWrapper: DbConnWrapper, newOpenState: bool) =
dbConnWrapper.open = newOpenState

proc check(db: DbConn): Result[void, string] =
var message: string
try:
message = $db.pqErrorMessage()
Expand All @@ -25,11 +53,11 @@ proc check*(db: DbConn): Result[void, string] =

return ok()

proc open*(connString: string): Result[DbConn, string] =
proc openDbConn(connString: string): Result[DbConn, string] =
## Opens a new connection.
var conn: DbConn = nil
try:
conn = open("", "", "", connString)
conn = open("", "", "", connString) ## included from db_postgres module
except DbError:
return err("exception opening new connection: " & getCurrentExceptionMsg())

Expand All @@ -46,22 +74,35 @@ proc open*(connString: string): Result[DbConn, string] =

return ok(conn)

proc closeDbConn*(db: DbConn) {.raises: [OSError].} =
let fd = db.pqsocket()
if fd != -1:
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
db.close()
proc new*(T: type DbConnWrapper, connString: string): Result[T, string] =
let dbConn = openDbConn(connString).valueOr:
return err("failed to establish a new connection: " & $error)

return ok(DbConnWrapper(dbConn: dbConn, open: true))

proc closeDbConn*(
dbConnWrapper: DbConnWrapper
): Result[void, string] {.raises: [OSError].} =
let fd = dbConnWrapper.dbConn.pqsocket()
if fd == -1:
return err("error file descriptor -1 in closeDbConn")

asyncengine.unregister(cast[asyncengine.AsyncFD](fd))

dbConnWrapper.dbConn.close()

return ok()

proc `$`(self: SqlQuery): string =
return cast[string](self)

proc sendQuery(
db: DbConn, query: SqlQuery, args: seq[string]
dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string]
): Future[Result[void, string]] {.async.} =
## This proc can be used directly for queries that don't retrieve values back.

if db.status != CONNECTION_OK:
db.check().isOkOr:
if dbConnWrapper.dbConn.status != CONNECTION_OK:
dbConnWrapper.dbConn.check().isOkOr:
return err("failed to connect to database: " & $error)

return err("unknown reason")
Expand All @@ -72,17 +113,16 @@ proc sendQuery(
except DbError:
return err("exception formatting the query: " & getCurrentExceptionMsg())

let success = db.pqsendQuery(cstring(wellFormedQuery))
let success = dbConnWrapper.dbConn.pqsendQuery(cstring(wellFormedQuery))
if success != 1:
db.check().isOkOr:
dbConnWrapper.dbConn.check().isOkOr:
return err("failed pqsendQuery: " & $error)

return err("failed pqsendQuery: unknown reason")

return ok()

proc sendQueryPrepared(
db: DbConn,
dbConnWrapper: DbConnWrapper,
stmtName: string,
paramValues: openArray[string],
paramLengths: openArray[int32],
Expand All @@ -96,8 +136,8 @@ proc sendQueryPrepared(
$paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len
return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg)

if db.status != CONNECTION_OK:
db.check().isOkOr:
if dbConnWrapper.dbConn.status != CONNECTION_OK:
dbConnWrapper.dbConn.check().isOkOr:
return err("failed to connect to database: " & $error)

return err("unknown reason")
Expand All @@ -110,7 +150,7 @@ proc sendQueryPrepared(

const ResultFormat = 0 ## 0 for text format, 1 for binary format.

let success = db.pqsendQueryPrepared(
let success = dbConnWrapper.dbConn.pqsendQueryPrepared(
stmtName,
nParams,
cstrArrayParams,
Expand All @@ -119,49 +159,63 @@ proc sendQueryPrepared(
ResultFormat,
)
if success != 1:
db.check().isOkOr:
dbConnWrapper.dbConn.check().isOkOr:
return err("failed pqsendQueryPrepared: " & $error)

return err("failed pqsendQueryPrepared: unknown reason")

return ok()

proc waitQueryToFinish(
db: DbConn, rowCallback: DataProc = nil
dbConnWrapper: DbConnWrapper, rowCallback: DataProc = nil
): Future[Result[void, string]] {.async.} =
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
## For other queries, like "INSERT", 'rowCallback' should be nil.

var dataAvailable = false
let futDataAvailable = newFuture[void]("futDataAvailable")

proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} =
dataAvailable = true
if not futDataAvailable.completed():
futDataAvailable.complete()

let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db))
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn))

asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr:
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("failed to add event reader in waitQueryToFinish: " & $error)
defer:
asyncengine.removeReader2(asyncFd).isOkOr:
return err("failed to remove event reader in waitQueryToFinish: " & $error)

while not dataAvailable:
await sleepAsync(timer.milliseconds(1))
await futDataAvailable

## Now retrieve the result
## Now retrieve the result from the database
while true:
let pqResult = db.pqgetResult()
let pqResult = dbConnWrapper.dbConn.pqgetResult()

if pqResult == nil:
db.check().isOkOr:
dbConnWrapper.dbConn.check().isOkOr:
if not dbConnWrapper.futBecomeFree.failed():
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("error in query: " & $error)

return ok() # reached the end of the results
dbConnWrapper.futBecomeFree.complete()
return ok() # reached the end of the results. The query is completed

if not rowCallback.isNil():
rowCallback(pqResult)

pqclear(pqResult)

proc dbConnQuery*(
db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc
dbConnWrapper: DbConnWrapper,
query: SqlQuery,
args: seq[string],
rowCallback: DataProc,
requestId: string,
): Future[Result[void, string]] {.async, gcsafe.} =
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery")

let cleanedQuery = ($query).replace(" ", "").replace("\n", "")
## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition
var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "")
Expand All @@ -170,15 +224,17 @@ proc dbConnQuery*(

var queryStartTime = getTime().toUnixFloat()

(await db.sendQuery(query, args)).isOkOr:
(await dbConnWrapper.sendQuery(query, args)).isOkOr:
error "error in dbConnQuery", error = $error
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("error in dbConnQuery calling sendQuery: " & $error)

let sendDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(sendDuration, [querySummary, "sendQuery"])

queryStartTime = getTime().toUnixFloat()

(await db.waitQueryToFinish(rowCallback)).isOkOr:
(await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)

let waitDuration = getTime().toUnixFloat() - queryStartTime
Expand All @@ -188,6 +244,7 @@ proc dbConnQuery*(

if "insert" notin ($query).toLower():
debug "dbConnQuery",
requestId,
query = $query,
querySummary,
waitDurationSecs = waitDuration,
Expand All @@ -196,23 +253,28 @@ proc dbConnQuery*(
return ok()

proc dbConnQueryPrepared*(
db: DbConn,
dbConnWrapper: DbConnWrapper,
stmtName: string,
paramValues: seq[string],
paramLengths: seq[int32],
paramFormats: seq[int32],
rowCallback: DataProc,
requestId: string,
): Future[Result[void, string]] {.async, gcsafe.} =
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQueryPrepared")
var queryStartTime = getTime().toUnixFloat()
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:

dbConnWrapper.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
error "error in dbConnQueryPrepared", error = $error
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)

let sendDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(sendDuration, [stmtName, "sendQuery"])

queryStartTime = getTime().toUnixFloat()

(await db.waitQueryToFinish(rowCallback)).isOkOr:
(await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)

let waitDuration = getTime().toUnixFloat() - queryStartTime
Expand All @@ -222,6 +284,9 @@ proc dbConnQueryPrepared*(

if "insert" notin stmtName.toLower():
debug "dbConnQueryPrepared",
stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration
requestId,
stmtName,
waitDurationSecs = waitDuration,
sendDurationSecs = sendDuration

return ok()
Loading

0 comments on commit f1b12ca

Please sign in to comment.