Skip to content

Commit

Permalink
pgasyncpool: use thread signal to control when the pool is busy
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Sep 28, 2024
1 parent 57b0f7d commit d3a33da
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions waku/common/databases/db_postgres/pgasyncpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
{.push raises: [].}

import
std/[sequtils, nre, strformat, sets], results, chronos, chronicles, strutils, random
std/[sequtils, nre, strformat, sets],
results,
chronos,
chronos/threadsync,
chronicles,
strutils,
random
import ./dbconn, ../common, ../../../waku_core/time

type PgAsyncPoolState {.pure.} = enum
Expand All @@ -16,6 +22,7 @@ type PgDbConn = ref object
open: bool
busy: bool
preparedStmts: HashSet[string] ## [stmtName's]
busySignal: ThreadSignalPtr ## signal to wait while the pool is busy

type
# Database connection pool
Expand All @@ -26,6 +33,7 @@ type
state: PgAsyncPoolState
conns: seq[PgDbConn]
dir: string
busySignal: ThreadSignalPtr

proc `$`(self: PgDbConn): string =
if not isNil(self):
Expand Down Expand Up @@ -59,12 +67,16 @@ proc new*(
SyntaxError:
return err("could not parse postgres string: " & getCurrentExceptionMsg())

let busySignal = ThreadSignalPtr.new().valueOr:
return err("error creating busySignal ThreadSignalPtr in PgAsyncPool: " & $error)

let pool = PgAsyncPool(
connString: connString,
maxConnections: maxConnections,
state: PgAsyncPoolState.Live,
conns: newSeq[PgDbConn](0),
dir: dir,
busySignal: busySignal,
)

return ok(pool)
Expand Down Expand Up @@ -92,7 +104,8 @@ proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} =
# wait for the connections to be released and close them, without
# blocking the async runtime
while pool.conns.anyIt(it.busy):
await sleepAsync(0.milliseconds)
debug "closing waiting while connections are busy"
await pool.busySignal.wait()

for i in 0 ..< pool.conns.len:
if pool.conns[i].busy:
Expand Down Expand Up @@ -134,9 +147,9 @@ proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} =

if pool.conns.len == pool.maxConnections:
## Can't create more connections. Wait for a free connection without blocking the async runtime.
while pool.isBusy():
if pool.isBusy():
debug "getConnIndex", dir = pool.dir, conns = $pool.conns
await sleepAsync(0.milliseconds)
await pool.busySignal.wait()

return pool.getFirstFreeConnIndex()
elif pool.conns.len < pool.maxConnections:
Expand Down Expand Up @@ -174,6 +187,10 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
for i in 0 ..< pool.conns.len:
if pool.conns[i].dbConn == conn:
pool.conns[i].busy = false

pool.busySignal.fireSync().isOkOr:
error "error triggering busySignal in releaseConn", error = $error

debug "after releaseConn", conns = $pool.conns

const SlowQueryThresholdInNanoSeconds = 1_000_000_000
Expand Down

0 comments on commit d3a33da

Please sign in to comment.