From 8b14e4ecae2c67d9ab1bcfe5dbcf03036814a487 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 26 Sep 2024 16:37:16 +0200 Subject: [PATCH 01/12] use messages_lookup to retrieve timestamps --- waku/waku_archive/driver/postgres_driver/postgres_driver.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index a69fdb9f4..bfdffa6bf 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -123,9 +123,9 @@ const SelectWithCursorNoDataAscStmtDef = timestamp <= $7 ORDER BY timestamp ASC, messageHash ASC LIMIT $8;""" -const SelectCursorByHashName = "SelectMessageByHash" +const SelectCursorByHashName = "SelectMessageByHashInMessagesLookup" const SelectCursorByHashDef = - """SELECT timestamp FROM messages + """SELECT timestamp FROM messages_lookup WHERE messageHash = $1""" const From ef474775777c3a83cb7513cf7f630dbe79a595af Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 26 Sep 2024 16:37:33 +0200 Subject: [PATCH 02/12] waku_node: remove duplicated log --- waku/node/waku_node.nim | 2 -- 1 file changed, 2 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 9996e6c13..0174b2188 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -336,8 +336,6 @@ proc subscribe*( error "Invalid API call to `subscribe`. Was already subscribed" return - debug "subscribe", pubsubTopic = pubsubTopic - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) node.registerRelayDefaultHandler(pubsubTopic) From 5ccec216704d96c0477eca56883d4b71449ab74f Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 26 Sep 2024 18:46:41 +0200 Subject: [PATCH 03/12] pass requestId to dbconn to be printed --- waku/common/databases/db_postgres/dbconn.nim | 13 +++++++++++-- waku/common/databases/db_postgres/pgasyncpool.nim | 6 +++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index bc5da4ee6..21d9b944c 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -160,7 +160,11 @@ proc waitQueryToFinish( pqclear(pqResult) proc dbConnQuery*( - db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc + db: DbConn, + query: SqlQuery, + args: seq[string], + rowCallback: DataProc, + requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = let cleanedQuery = ($query).replace(" ", "").replace("\n", "") ## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition @@ -188,6 +192,7 @@ proc dbConnQuery*( if "insert" notin ($query).toLower(): debug "dbConnQuery", + requestId, query = $query, querySummary, waitDurationSecs = waitDuration, @@ -202,6 +207,7 @@ proc dbConnQueryPrepared*( paramLengths: seq[int32], paramFormats: seq[int32], rowCallback: DataProc, + requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = var queryStartTime = getTime().toUnixFloat() db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: @@ -222,6 +228,9 @@ proc dbConnQueryPrepared*( if "insert" notin stmtName.toLower(): debug "dbConnQueryPrepared", - stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration + requestId, + stmtName, + waitDurationSecs = waitDuration, + sendDurationSecs = sendDuration return ok() diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 66e66bd2f..4297e248e 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -146,7 +146,7 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) = if pool.conns[i].dbConn == conn: pool.conns[i].busy = false -const SlowQueryThresholdInNanoSeconds = 2_000_000_000 +const SlowQueryThresholdInNanoSeconds = 1_000_000_000 proc pgQuery*( pool: PgAsyncPool, @@ -167,7 +167,7 @@ proc pgQuery*( debug "pgQuery slow query", query_duration_secs = (queryDuration / 1_000_000_000), query, requestId - (await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr: + (await conn.dbConnQuery(sql(query), args, rowCallback, requestId)).isOkOr: return err("error in asyncpool query: " & $error) return ok() @@ -218,7 +218,7 @@ proc runStmt*( ( await conn.dbConnQueryPrepared( - stmtName, paramValues, paramLengths, paramFormats, rowCallback + stmtName, paramValues, paramLengths, paramFormats, rowCallback, requestId ) ).isOkOr: return err("error in runStmt: " & $error) From 2cea2aa4c0449238ea79536cad3010d3293cdba8 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 27 Sep 2024 13:24:00 +0200 Subject: [PATCH 04/12] dbconn: add too verbose debug logs waitQueryToFinish --- waku/common/databases/db_postgres/dbconn.nim | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index 21d9b944c..efd5fbfbd 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -127,11 +127,12 @@ proc sendQueryPrepared( return ok() proc waitQueryToFinish( - db: DbConn, rowCallback: DataProc = nil + db: DbConn, rowCallback: DataProc = nil, requestId: string ): Future[Result[void, string]] {.async.} = ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. + debug "waitQueryToFinish", requestId var dataAvailable = false proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = dataAvailable = true @@ -141,8 +142,10 @@ proc waitQueryToFinish( asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: return err("failed to add event reader in waitQueryToFinish: " & $error) + debug "waitQueryToFinish", requestId while not dataAvailable: await sleepAsync(timer.milliseconds(1)) + debug "waitQueryToFinish", requestId ## Now retrieve the result while true: @@ -152,6 +155,8 @@ proc waitQueryToFinish( db.check().isOkOr: return err("error in query: " & $error) + debug "waitQueryToFinish", requestId + return ok() # reached the end of the results if not rowCallback.isNil(): @@ -182,7 +187,7 @@ proc dbConnQuery*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback)).isOkOr: + (await db.waitQueryToFinish(rowCallback, requestId)).isOkOr: return err("error in dbConnQuery calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime @@ -218,7 +223,7 @@ proc dbConnQueryPrepared*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback)).isOkOr: + (await db.waitQueryToFinish(rowCallback, requestId)).isOkOr: return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime From c61e188476cb0c3d90ef36ad06f0efe53ef7f68d Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 27 Sep 2024 15:30:30 +0200 Subject: [PATCH 05/12] dbconn: add db detail in logs --- waku/common/databases/db_postgres/dbconn.nim | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index efd5fbfbd..9b3a2e593 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -132,7 +132,7 @@ proc waitQueryToFinish( ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - debug "waitQueryToFinish", requestId + debug "waitQueryToFinish", requestId, db = db.repr var dataAvailable = false proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = dataAvailable = true @@ -142,10 +142,10 @@ proc waitQueryToFinish( asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: return err("failed to add event reader in waitQueryToFinish: " & $error) - debug "waitQueryToFinish", requestId + debug "waitQueryToFinish", requestId, db = db.repr while not dataAvailable: await sleepAsync(timer.milliseconds(1)) - debug "waitQueryToFinish", requestId + debug "waitQueryToFinish", requestId, db = db.repr ## Now retrieve the result while true: @@ -155,7 +155,7 @@ proc waitQueryToFinish( db.check().isOkOr: return err("error in query: " & $error) - debug "waitQueryToFinish", requestId + debug "waitQueryToFinish", requestId, db = db.repr return ok() # reached the end of the results From a0a565007eccb2bf3a5cab1ad02373716c1010e8 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sat, 28 Sep 2024 15:59:25 +0200 Subject: [PATCH 06/12] dbconn: use signal to coordinate resp from db instead of multiple awaits --- waku/common/databases/db_postgres/dbconn.nim | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index 9b3a2e593..48c049850 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -2,6 +2,7 @@ import std/[times, strutils, asyncnet, os, sequtils], results, chronos, + chronos/threadsync, metrics, re, chronicles @@ -132,20 +133,28 @@ proc waitQueryToFinish( ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - debug "waitQueryToFinish", requestId, db = db.repr - var dataAvailable = false + var triggered = false + var signal = ThreadSignalPtr.new().valueOr: + return err("error creating ThreadSignalPtr in waitQueryToFinish: " & $error) + proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = - dataAvailable = true + if not triggered: + signal.fireSync().isOkOr: + error "error triggering coordination signal in dbconn", error = $error + triggered = true let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db)) asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: return err("failed to add event reader in waitQueryToFinish: " & $error) + defer: + asyncengine.removeReader2(asyncFd).isOkOr: + return err("failed to remove event reader in waitQueryToFinish: " & $error) + signal.close().isOkOr: + return err("error closing data available signal: " & $error) debug "waitQueryToFinish", requestId, db = db.repr - while not dataAvailable: - await sleepAsync(timer.milliseconds(1)) - debug "waitQueryToFinish", requestId, db = db.repr + await signal.wait() ## Now retrieve the result while true: From ae5cd5fbb259771e3af0398f3b37b3adb2930886 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sat, 28 Sep 2024 16:49:22 +0200 Subject: [PATCH 07/12] pgasyncpool: use thread signal to control when the pool is busy --- .../databases/db_postgres/pgasyncpool.nim | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 4297e248e..f2262708a 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -3,6 +3,13 @@ {.push raises: [].} import std/[sequtils, nre, strformat, sets], results, chronos, chronicles + std/[sequtils, nre, strformat, sets], + results, + chronos, + chronos/threadsync, + chronicles, + strutils, + random import ./dbconn, ../common, ../../../waku_core/time type PgAsyncPoolState {.pure.} = enum @@ -15,6 +22,7 @@ type PgDbConn = ref object open: bool busy: bool preparedStmts: HashSet[string] ## [stmtName's] + busySignal: ThreadSignalPtr ## signal to wait while the pool is busy type # Database connection pool @@ -24,6 +32,7 @@ type state: PgAsyncPoolState conns: seq[PgDbConn] + busySignal: ThreadSignalPtr proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] = var connString: string @@ -42,11 +51,15 @@ proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResu SyntaxError: return err("could not parse postgres string: " & getCurrentExceptionMsg()) + let busySignal = ThreadSignalPtr.new().valueOr: + return err("error creating busySignal ThreadSignalPtr in PgAsyncPool: " & $error) + let pool = PgAsyncPool( connString: connString, maxConnections: maxConnections, state: PgAsyncPoolState.Live, conns: newSeq[PgDbConn](0), + busySignal: busySignal, ) return ok(pool) @@ -70,7 +83,8 @@ proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} = # wait for the connections to be released and close them, without # blocking the async runtime while pool.conns.anyIt(it.busy): - await sleepAsync(0.milliseconds) + debug "closing waiting while connections are busy" + await pool.busySignal.wait() for i in 0 ..< pool.conns.len: if pool.conns[i].busy: @@ -110,8 +124,8 @@ proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} = if pool.conns.len == pool.maxConnections: ## Can't create more connections. Wait for a free connection without blocking the async runtime. - while pool.isBusy(): - await sleepAsync(0.milliseconds) + if pool.isBusy(): + await pool.busySignal.wait() return pool.getFirstFreeConnIndex() elif pool.conns.len < pool.maxConnections: @@ -146,6 +160,10 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) = if pool.conns[i].dbConn == conn: pool.conns[i].busy = false + pool.busySignal.fireSync().isOkOr: + error "error triggering busySignal in releaseConn", error = $error + + const SlowQueryThresholdInNanoSeconds = 1_000_000_000 proc pgQuery*( From e7fec5de94c90ff5d4c5ad25e2049ec417fce586 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 30 Sep 2024 03:06:18 +0200 Subject: [PATCH 08/12] deep refactoring in db_postgres for better use of async approach --- tests/waku_archive/test_driver_postgres.nim | 9 +- waku/common/databases/db_postgres/dbconn.nim | 132 ++++++++++++------ .../databases/db_postgres/pgasyncpool.nim | 114 ++++----------- 3 files changed, 119 insertions(+), 136 deletions(-) diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 7b808c14d..34a428615 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -34,16 +34,15 @@ suite "Postgres driver": var futures = newSeq[Future[ArchiveDriverResult[void]]](0) let beforeSleep = now() - for _ in 1 .. 100: + + for _ in 1 .. 25: futures.add(driver.sleep(1)) await allFutures(futures) let diff = now() - beforeSleep - # Actually, the diff randomly goes between 1 and 2 seconds. - # although in theory it should spend 1s because we establish 100 - # connections and we spawn 100 tasks that spend ~1s each. - assert diff < 20_000_000_000 + + assert diff < 2_000_000_000 ## nanoseconds asyncTest "Insert a message": const contentTopic = "test-content-topic" diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index 48c049850..1708536c4 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -1,5 +1,5 @@ import - std/[times, strutils, asyncnet, os, sequtils], + std/[times, strutils, asyncnet, os, sequtils, sets], results, chronos, chronos/threadsync, @@ -12,9 +12,36 @@ include db_connector/db_postgres type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].} +type DbConnWrapper* = ref object + dbConn: DbConn + open: bool + preparedStmts: HashSet[string] ## [stmtName's] + futBecomeFree*: Future[void] + ## to notify the pgasyncpool that this conn is free, i.e. not busy + ## Connection management -proc check*(db: DbConn): Result[void, string] = +proc containsPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string): bool = + return dbConnWrapper.preparedStmts.contains(preparedStmt) + +proc inclPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string) = + dbConnWrapper.preparedStmts.incl(preparedStmt) + +proc getDbConn*(dbConnWrapper: DbConnWrapper): DbConn = + return dbConnWrapper.dbConn + +proc isPgDbConnBusy*(dbConnWrapper: DbConnWrapper): bool = + if isNil(dbConnWrapper.futBecomeFree): + return false + return not dbConnWrapper.futBecomeFree.finished() + +proc isPgDbConnOpen*(dbConnWrapper: DbConnWrapper): bool = + return dbConnWrapper.open + +proc setPgDbConnOpen*(dbConnWrapper: DbConnWrapper, newOpenState: bool) = + dbConnWrapper.open = newOpenState + +proc check(db: DbConn): Result[void, string] = var message: string try: message = $db.pqErrorMessage() @@ -26,11 +53,11 @@ proc check*(db: DbConn): Result[void, string] = return ok() -proc open*(connString: string): Result[DbConn, string] = +proc openDbConn(connString: string): Result[DbConn, string] = ## Opens a new connection. var conn: DbConn = nil try: - conn = open("", "", "", connString) + conn = open("", "", "", connString) ## included from db_postgres module except DbError: return err("exception opening new connection: " & getCurrentExceptionMsg()) @@ -47,22 +74,35 @@ proc open*(connString: string): Result[DbConn, string] = return ok(conn) -proc closeDbConn*(db: DbConn) {.raises: [OSError].} = - let fd = db.pqsocket() - if fd != -1: - asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) - db.close() +proc new*(T: type DbConnWrapper, connString: string): Result[T, string] = + let dbConn = openDbConn(connString).valueOr: + return err("failed to stablish a new connection: " & $error) + + return ok(DbConnWrapper(dbConn: dbConn, open: true)) + +proc closeDbConn*( + dbConnWrapper: DbConnWrapper +): Result[void, string] {.raises: [OSError].} = + let fd = dbConnWrapper.dbConn.pqsocket() + if fd == -1: + return err("error file descriptor -1 in closeDbConn") + + asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) + + dbConnWrapper.dbConn.close() + + return ok() proc `$`(self: SqlQuery): string = return cast[string](self) proc sendQuery( - db: DbConn, query: SqlQuery, args: seq[string] + dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string] ): Future[Result[void, string]] {.async.} = ## This proc can be used directly for queries that don't retrieve values back. - if db.status != CONNECTION_OK: - db.check().isOkOr: + if dbConnWrapper.dbConn.status != CONNECTION_OK: + dbConnWrapper.dbConn.check().isOkOr: return err("failed to connect to database: " & $error) return err("unknown reason") @@ -73,17 +113,16 @@ proc sendQuery( except DbError: return err("exception formatting the query: " & getCurrentExceptionMsg()) - let success = db.pqsendQuery(cstring(wellFormedQuery)) + let success = dbConnWrapper.dbConn.pqsendQuery(cstring(wellFormedQuery)) if success != 1: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: return err("failed pqsendQuery: " & $error) - return err("failed pqsendQuery: unknown reason") return ok() proc sendQueryPrepared( - db: DbConn, + dbConnWrapper: DbConnWrapper, stmtName: string, paramValues: openArray[string], paramLengths: openArray[int32], @@ -97,8 +136,8 @@ proc sendQueryPrepared( $paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg) - if db.status != CONNECTION_OK: - db.check().isOkOr: + if dbConnWrapper.dbConn.status != CONNECTION_OK: + dbConnWrapper.dbConn.check().isOkOr: return err("failed to connect to database: " & $error) return err("unknown reason") @@ -111,7 +150,7 @@ proc sendQueryPrepared( const ResultFormat = 0 ## 0 for text format, 1 for binary format. - let success = db.pqsendQueryPrepared( + let success = dbConnWrapper.dbConn.pqsendQueryPrepared( stmtName, nParams, cstrArrayParams, @@ -120,7 +159,7 @@ proc sendQueryPrepared( ResultFormat, ) if success != 1: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: return err("failed pqsendQueryPrepared: " & $error) return err("failed pqsendQueryPrepared: unknown reason") @@ -128,45 +167,42 @@ proc sendQueryPrepared( return ok() proc waitQueryToFinish( - db: DbConn, rowCallback: DataProc = nil, requestId: string + dbConnWrapper: DbConnWrapper, rowCallback: DataProc = nil ): Future[Result[void, string]] {.async.} = ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - var triggered = false - var signal = ThreadSignalPtr.new().valueOr: - return err("error creating ThreadSignalPtr in waitQueryToFinish: " & $error) + var triggered = false ## to control the "data available" signal is only triggered once + + let futDataAvailable = newFuture[void]("futDataAvailable") proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = if not triggered: - signal.fireSync().isOkOr: - error "error triggering coordination signal in dbconn", error = $error - triggered = true + futDataAvailable.complete() + triggered = true - let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db)) + let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn)) asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("failed to add event reader in waitQueryToFinish: " & $error) defer: asyncengine.removeReader2(asyncFd).isOkOr: return err("failed to remove event reader in waitQueryToFinish: " & $error) - signal.close().isOkOr: - return err("error closing data available signal: " & $error) - debug "waitQueryToFinish", requestId, db = db.repr - await signal.wait() + await futDataAvailable - ## Now retrieve the result + ## Now retrieve the result from the database while true: - let pqResult = db.pqgetResult() + let pqResult = dbConnWrapper.dbConn.pqgetResult() if pqResult == nil: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in query: " & $error) - debug "waitQueryToFinish", requestId, db = db.repr - - return ok() # reached the end of the results + dbConnWrapper.futBecomeFree.complete() + return ok() # reached the end of the results. The query is completed if not rowCallback.isNil(): rowCallback(pqResult) @@ -174,12 +210,14 @@ proc waitQueryToFinish( pqclear(pqResult) proc dbConnQuery*( - db: DbConn, + dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string], rowCallback: DataProc, requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = + dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery") + let cleanedQuery = ($query).replace(" ", "").replace("\n", "") ## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "") @@ -188,7 +226,9 @@ proc dbConnQuery*( var queryStartTime = getTime().toUnixFloat() - (await db.sendQuery(query, args)).isOkOr: + (await dbConnWrapper.sendQuery(query, args)).isOkOr: + error "error in dbConnQuery", error = $error + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in dbConnQuery calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime @@ -196,7 +236,7 @@ proc dbConnQuery*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback, requestId)).isOkOr: + (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQuery calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime @@ -215,7 +255,7 @@ proc dbConnQuery*( return ok() proc dbConnQueryPrepared*( - db: DbConn, + dbConnWrapper: DbConnWrapper, stmtName: string, paramValues: seq[string], paramLengths: seq[int32], @@ -223,8 +263,12 @@ proc dbConnQueryPrepared*( rowCallback: DataProc, requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = + dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQueryPrepared") var queryStartTime = getTime().toUnixFloat() - db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: + + dbConnWrapper.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) + error "error in dbConnQueryPrepared", error = $error return err("error in dbConnQueryPrepared calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime @@ -232,7 +276,7 @@ proc dbConnQueryPrepared*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback, requestId)).isOkOr: + (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index f2262708a..6d2ff2ea7 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -2,37 +2,22 @@ # Inspired by: https://github.com/treeform/pg/ {.push raises: [].} -import std/[sequtils, nre, strformat, sets], results, chronos, chronicles - std/[sequtils, nre, strformat, sets], +import + std/[sequtils, nre, strformat], results, chronos, chronos/threadsync, chronicles, - strutils, - random + strutils import ./dbconn, ../common, ../../../waku_core/time -type PgAsyncPoolState {.pure.} = enum - Closed - Live - Closing - -type PgDbConn = ref object - dbConn: DbConn - open: bool - busy: bool - preparedStmts: HashSet[string] ## [stmtName's] - busySignal: ThreadSignalPtr ## signal to wait while the pool is busy - type # Database connection pool PgAsyncPool* = ref object connString: string maxConnections: int - - state: PgAsyncPoolState - conns: seq[PgDbConn] - busySignal: ThreadSignalPtr + conns: seq[DbConnWrapper] + busySignal: ThreadSignalPtr ## signal to wait while the pool is busy proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] = var connString: string @@ -51,93 +36,64 @@ proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResu SyntaxError: return err("could not parse postgres string: " & getCurrentExceptionMsg()) - let busySignal = ThreadSignalPtr.new().valueOr: - return err("error creating busySignal ThreadSignalPtr in PgAsyncPool: " & $error) - let pool = PgAsyncPool( connString: connString, maxConnections: maxConnections, - state: PgAsyncPoolState.Live, - conns: newSeq[PgDbConn](0), - busySignal: busySignal, + conns: newSeq[DbConnWrapper](0), ) return ok(pool) -func isLive(pool: PgAsyncPool): bool = - pool.state == PgAsyncPoolState.Live - func isBusy(pool: PgAsyncPool): bool = - pool.conns.mapIt(it.busy).allIt(it) + return pool.conns.mapIt(it.isPgDbConnBusy()).allIt(it) proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} = ## Gracefully wait and close all openned connections - - if pool.state == PgAsyncPoolState.Closing: - while pool.state == PgAsyncPoolState.Closing: - await sleepAsync(0.milliseconds) # Do not block the async runtime - return ok() - - pool.state = PgAsyncPoolState.Closing - # wait for the connections to be released and close them, without # blocking the async runtime - while pool.conns.anyIt(it.busy): - debug "closing waiting while connections are busy" - await pool.busySignal.wait() - for i in 0 ..< pool.conns.len: - if pool.conns[i].busy: - continue + debug "close PgAsyncPool" + await allFutures(pool.conns.mapIt(it.futBecomeFree)) + debug "closing all connection PgAsyncPool" for i in 0 ..< pool.conns.len: - if pool.conns[i].open: - pool.conns[i].dbConn.closeDbConn() - pool.conns[i].busy = false - pool.conns[i].open = false + if pool.conns[i].isPgDbConnOpen(): + pool.conns[i].closeDbConn().isOkOr: + return err("error in close PgAsyncPool: " & $error) + pool.conns[i].setPgDbConnOpen(false) pool.conns.setLen(0) - pool.state = PgAsyncPoolState.Closed return ok() proc getFirstFreeConnIndex(pool: PgAsyncPool): DatabaseResult[int] = for index in 0 ..< pool.conns.len: - if pool.conns[index].busy: + if pool.conns[index].isPgDbConnBusy(): continue ## Pick up the first free connection and set it busy - pool.conns[index].busy = true return ok(index) proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} = ## Waits for a free connection or create if max connections limits have not been reached. ## Returns the index of the free connection - if not pool.isLive(): - return err("pool is not live") - if not pool.isBusy(): return pool.getFirstFreeConnIndex() ## Pool is busy then - if pool.conns.len == pool.maxConnections: ## Can't create more connections. Wait for a free connection without blocking the async runtime. - if pool.isBusy(): - await pool.busySignal.wait() + let busyFuts = pool.conns.mapIt(it.futBecomeFree) + discard await one(busyFuts) return pool.getFirstFreeConnIndex() elif pool.conns.len < pool.maxConnections: ## stablish a new connection - let conn = dbconn.open(pool.connString).valueOr: - return err("failed to stablish a new connection: " & $error) + let dbConn = DbConnWrapper.new(pool.connString).valueOr: + return err("error creating DbConnWrapper: " & $error) - pool.conns.add( - PgDbConn( - dbConn: conn, open: true, busy: true, preparedStmts: initHashSet[string]() - ) - ) + pool.conns.add(dbConn) return ok(pool.conns.len - 1) proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = @@ -145,25 +101,11 @@ proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = ## This proc is intended to be called when the connection with the database ## got interrupted from the database side or a connectivity problem happened. - for i in 0 ..< pool.conns.len: - pool.conns[i].busy = false - (await pool.close()).isOkOr: return err("error in resetConnPool: " & error) - pool.state = PgAsyncPoolState.Live return ok() -proc releaseConn(pool: PgAsyncPool, conn: DbConn) = - ## Marks the connection as released. - for i in 0 ..< pool.conns.len: - if pool.conns[i].dbConn == conn: - pool.conns[i].busy = false - - pool.busySignal.fireSync().isOkOr: - error "error triggering busySignal in releaseConn", error = $error - - const SlowQueryThresholdInNanoSeconds = 1_000_000_000 proc pgQuery*( @@ -177,15 +119,14 @@ proc pgQuery*( return err("connRes.isErr in query: " & $error) let queryStartTime = getNowInNanosecondTime() - let conn = pool.conns[connIndex].dbConn + let dbConnWrapper = pool.conns[connIndex] defer: - pool.releaseConn(conn) let queryDuration = getNowInNanosecondTime() - queryStartTime if queryDuration > SlowQueryThresholdInNanoSeconds: debug "pgQuery slow query", query_duration_secs = (queryDuration / 1_000_000_000), query, requestId - (await conn.dbConnQuery(sql(query), args, rowCallback, requestId)).isOkOr: + (await dbConnWrapper.dbConnQuery(sql(query), args, rowCallback, requestId)).isOkOr: return err("error in asyncpool query: " & $error) return ok() @@ -210,11 +151,10 @@ proc runStmt*( let connIndex = (await pool.getConnIndex()).valueOr: return err("Error in runStmt: " & $error) - let conn = pool.conns[connIndex].dbConn + let dbConnWrapper = pool.conns[connIndex] let queryStartTime = getNowInNanosecondTime() defer: - pool.releaseConn(conn) let queryDuration = getNowInNanosecondTime() - queryStartTime if queryDuration > SlowQueryThresholdInNanoSeconds: debug "runStmt slow query", @@ -222,20 +162,20 @@ proc runStmt*( query = stmtDefinition, requestId - if not pool.conns[connIndex].preparedStmts.contains(stmtName): + if not pool.conns[connIndex].containsPreparedStmt(stmtName): # The connection doesn't have that statement yet. Let's create it. # Each session/connection has its own prepared statements. let res = catch: let len = paramValues.len - discard conn.prepare(stmtName, sql(stmtDefinition), len) + discard dbConnWrapper.getDbConn().prepare(stmtName, sql(stmtDefinition), len) if res.isErr(): return err("failed prepare in runStmt: " & res.error.msg) - pool.conns[connIndex].preparedStmts.incl(stmtName) + pool.conns[connIndex].inclPreparedStmt(stmtName) ( - await conn.dbConnQueryPrepared( + await dbConnWrapper.dbConnQueryPrepared( stmtName, paramValues, paramLengths, paramFormats, rowCallback, requestId ) ).isOkOr: From 500ca09b1e111eba8bf5c6d41972c636e3cf113a Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 30 Sep 2024 17:23:02 +0200 Subject: [PATCH 09/12] Update waku/common/databases/db_postgres/dbconn.nim Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> --- waku/common/databases/db_postgres/dbconn.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index 1708536c4..c56bec52f 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -76,7 +76,7 @@ proc openDbConn(connString: string): Result[DbConn, string] = proc new*(T: type DbConnWrapper, connString: string): Result[T, string] = let dbConn = openDbConn(connString).valueOr: - return err("failed to stablish a new connection: " & $error) + return err("failed to establish a new connection: " & $error) return ok(DbConnWrapper(dbConn: dbConn, open: true)) From a13c68f6091c50bc9f85c39d4f054cc5ab037036 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 30 Sep 2024 22:50:53 +0200 Subject: [PATCH 10/12] dbconn: better protection against possible double future ending and simplify --- waku/common/databases/db_postgres/dbconn.nim | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index c56bec52f..aae8d80bc 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -172,14 +172,11 @@ proc waitQueryToFinish( ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - var triggered = false ## to control the "data available" signal is only triggered once - let futDataAvailable = newFuture[void]("futDataAvailable") proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = - if not triggered: + if not futDataAvailable.completed(): futDataAvailable.complete() - triggered = true let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn)) @@ -198,7 +195,8 @@ proc waitQueryToFinish( if pqResult == nil: dbConnWrapper.dbConn.check().isOkOr: - dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) + if not dbConnWrapper.futBecomeFree.failed(): + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in query: " & $error) dbConnWrapper.futBecomeFree.complete() From 54e635d38032177ca56eb07570473b48b8308c49 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 1 Oct 2024 22:20:56 +0200 Subject: [PATCH 11/12] pgasyncpool: apply Zoltan recommendation to make time threshold more readable --- waku/common/databases/db_postgres/pgasyncpool.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 6d2ff2ea7..d8a8c72ec 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -106,7 +106,7 @@ proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = return ok() -const SlowQueryThresholdInNanoSeconds = 1_000_000_000 +const SlowQueryThreshold = 1.seconds proc pgQuery*( pool: PgAsyncPool, @@ -122,7 +122,7 @@ proc pgQuery*( let dbConnWrapper = pool.conns[connIndex] defer: let queryDuration = getNowInNanosecondTime() - queryStartTime - if queryDuration > SlowQueryThresholdInNanoSeconds: + if queryDuration > SlowQueryThreshold.nanos: debug "pgQuery slow query", query_duration_secs = (queryDuration / 1_000_000_000), query, requestId From 7547e46377d4e8fa381681e9630fc098d372e11e Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 1 Oct 2024 22:53:03 +0200 Subject: [PATCH 12/12] pgasyncpool: fix compilation issue after recent change --- waku/common/databases/db_postgres/pgasyncpool.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index d8a8c72ec..10e70eb51 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -156,7 +156,7 @@ proc runStmt*( defer: let queryDuration = getNowInNanosecondTime() - queryStartTime - if queryDuration > SlowQueryThresholdInNanoSeconds: + if queryDuration > SlowQueryThreshold.nanos: debug "runStmt slow query", query_duration = queryDuration / 1_000_000_000, query = stmtDefinition,