diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index b907cca97..e77d07109 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -3,7 +3,13 @@ {.push raises: [].} import - std/[sequtils, nre, strformat, sets], results, chronos, chronicles, strutils, random + std/[sequtils, nre, strformat, sets], + results, + chronos, + chronos/threadsync, + chronicles, + strutils, + random import ./dbconn, ../common, ../../../waku_core/time type PgAsyncPoolState {.pure.} = enum @@ -16,6 +22,7 @@ type PgDbConn = ref object open: bool busy: bool preparedStmts: HashSet[string] ## [stmtName's] + busySignal: ThreadSignalPtr ## signal to wait while the pool is busy type # Database connection pool @@ -26,6 +33,7 @@ type state: PgAsyncPoolState conns: seq[PgDbConn] dir: string + busySignal: ThreadSignalPtr proc `$`(self: PgDbConn): string = if not isNil(self): @@ -59,12 +67,16 @@ proc new*( SyntaxError: return err("could not parse postgres string: " & getCurrentExceptionMsg()) + let busySignal = ThreadSignalPtr.new().valueOr: + return err("error creating busySignal ThreadSignalPtr in PgAsyncPool: " & $error) + let pool = PgAsyncPool( connString: connString, maxConnections: maxConnections, state: PgAsyncPoolState.Live, conns: newSeq[PgDbConn](0), dir: dir, + busySignal: busySignal, ) return ok(pool) @@ -92,7 +104,8 @@ proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} = # wait for the connections to be released and close them, without # blocking the async runtime while pool.conns.anyIt(it.busy): - await sleepAsync(0.milliseconds) + debug "closing waiting while connections are busy" + await pool.busySignal.wait() for i in 0 ..< pool.conns.len: if pool.conns[i].busy: @@ -134,9 +147,9 @@ proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} = if pool.conns.len == pool.maxConnections: ## Can't create more connections. Wait for a free connection without blocking the async runtime. - while pool.isBusy(): + if pool.isBusy(): debug "getConnIndex", dir = pool.dir, conns = $pool.conns - await sleepAsync(0.milliseconds) + await pool.busySignal.wait() return pool.getFirstFreeConnIndex() elif pool.conns.len < pool.maxConnections: @@ -174,6 +187,10 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) = for i in 0 ..< pool.conns.len: if pool.conns[i].dbConn == conn: pool.conns[i].busy = false + + pool.busySignal.fireSync().isOkOr: + error "error triggering busySignal in releaseConn", error = $error + debug "after releaseConn", conns = $pool.conns const SlowQueryThresholdInNanoSeconds = 1_000_000_000